You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by ma...@apache.org on 2012/08/27 18:42:10 UTC
svn commit: r1377737 - in /oodt/trunk/workflow/src:
main/java/org/apache/oodt/cas/workflow/engine/
main/java/org/apache/oodt/cas/workflow/engine/processor/
main/java/org/apache/oodt/cas/workflow/instrepo/
main/java/org/apache/oodt/cas/workflow/lifecycl...
Author: mattmann
Date: Mon Aug 27 16:42:09 2012
New Revision: 1377737
URL: http://svn.apache.org/viewvc?rev=1377737&view=rev
Log:
- OODT-310 WIP:
- can actually execute a workflow task now using the AsynchronousLocalEngineRunner
- workflow states updating from Queued to Executed
- states being persisted to the Workflow Engine factory
- workflow properties updated with necessary properties and defaults to configure and execute
- object factories creating prioritizer and engine runners
Added:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngineFactory.java
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessorQueue.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/lifecycle/WorkflowLifecycle.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/structs/ParentChildWorkflow.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/util/GenericWorkflowObjectFactory.java
oodt/trunk/workflow/src/main/resources/workflow.properties
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MetSetterTaskQuerier.java
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java?rev=1377737&r1=1377736&r2=1377737&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java Mon Aug 27 16:42:09 2012
@@ -17,29 +17,29 @@
package org.apache.oodt.cas.workflow.engine;
+//JDK imports
import java.net.URL;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.Calendar;
import java.util.UUID;
-import java.util.Vector;
import java.util.logging.Logger;
+//OODT imports
import org.apache.oodt.cas.metadata.Metadata;
-import org.apache.oodt.cas.workflow.engine.processor.WorkflowProcessor;
import org.apache.oodt.cas.workflow.engine.processor.WorkflowProcessorQueue;
import org.apache.oodt.cas.workflow.engine.runner.EngineRunner;
import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycle;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycleManager;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowState;
+import org.apache.oodt.cas.workflow.repository.WorkflowRepository;
import org.apache.oodt.cas.workflow.structs.HighestFIFOPrioritySorter;
+import org.apache.oodt.cas.workflow.structs.ParentChildWorkflow;
+import org.apache.oodt.cas.workflow.structs.Priority;
import org.apache.oodt.cas.workflow.structs.PrioritySorter;
import org.apache.oodt.cas.workflow.structs.Workflow;
import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
-import org.apache.oodt.cas.workflow.structs.WorkflowStatus;
-import org.apache.oodt.cas.workflow.structs.WorkflowTask;
import org.apache.oodt.cas.workflow.structs.exceptions.EngineException;
-import org.apache.oodt.commons.util.DateConvert;
+import org.apache.oodt.cas.workflow.structs.exceptions.InstanceRepositoryException;
/**
*
@@ -57,24 +57,26 @@ public class PrioritizedQueueBasedWorkfl
private final Thread queuerThread;
private final Thread runnerThread;
private final WorkflowInstanceRepository repo;
+ private final WorkflowRepository modelRepo;
+ private final WorkflowLifecycleManager lifecycle;
private final PrioritySorter prioritizer;
private WorkflowProcessorQueue processorQueue;
- private final URL wmgrUrl;
- private final long conditionWait;
+ private URL wmgrUrl;
private EngineRunner runner;
public PrioritizedQueueBasedWorkflowEngine(WorkflowInstanceRepository repo,
- PrioritySorter prioritizer, long conditionWait) {
+ PrioritySorter prioritizer, WorkflowLifecycleManager lifecycle, EngineRunner runner, WorkflowRepository modelRepo) {
this.repo = repo;
this.prioritizer = prioritizer != null ? new HighestFIFOPrioritySorter(1,
50, 1) : prioritizer;
- this.wmgrUrl = null;
- this.conditionWait = conditionWait;
- this.processorQueue = new WorkflowProcessorQueue();
+ this.lifecycle = lifecycle;
+ this.modelRepo = modelRepo;
+ this.processorQueue = new WorkflowProcessorQueue(repo, lifecycle, modelRepo);
+ this.runner = runner;
// Task QUEUER thread
- TaskQuerier querier = new TaskQuerier(processorQueue, this.prioritizer);
+ TaskQuerier querier = new TaskQuerier(processorQueue, this.prioritizer, this.repo);
queuerThread = new Thread(querier);
queuerThread.start();
@@ -106,9 +108,31 @@ public class PrioritizedQueueBasedWorkfl
// create a new WorkflowProcessor around it
// set it in Queued status
// commit it to workflow instance repo and it will get picked up
- // by the runner thread
+
+ WorkflowInstance inst = new WorkflowInstance();
+ inst.setParentChildWorkflow(workflow instanceof ParentChildWorkflow ?
+ (ParentChildWorkflow)workflow:new ParentChildWorkflow(workflow));
+ inst.setStartDate(Calendar.getInstance().getTime());
+ inst.setCurrentTaskId(workflow.getTasks().get(0).getTaskId());
+ inst.setId(UUID.randomUUID().toString());
+ inst.setSharedContext(metadata);
+ inst.setPriority(Priority.getDefault());
+ WorkflowLifecycle cycle =
+ lifecycle.getLifecycleForWorkflow(workflow) != null ?
+ lifecycle.getLifecycleForWorkflow(workflow):
+ lifecycle.getDefaultLifecycle();
+ WorkflowState state = cycle.getStateByName("Queued");
+ state.setMessage("Workflow started and Queued.");
+ inst.setState(state);
+ System.out.println("CATEGORY NAME: ["+inst.getState().getCategory().getName()+"]");
+ try {
+ this.repo.addWorkflowInstance(inst);
+ } catch (InstanceRepositoryException e) {
+ e.printStackTrace();
+ throw new EngineException(e.getMessage());
+ }
- return null;
+ return inst;
}
/*
@@ -183,7 +207,7 @@ public class PrioritizedQueueBasedWorkfl
*/
@Override
public void setWorkflowManagerUrl(URL url) {
- // TODO Auto-generated method stub
+ this.wmgrUrl = url;
}
Added: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngineFactory.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngineFactory.java?rev=1377737&view=auto
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngineFactory.java (added)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngineFactory.java Mon Aug 27 16:42:09 2012
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//OODT imports
+import org.apache.oodt.cas.metadata.util.PathUtils;
+import org.apache.oodt.cas.workflow.engine.runner.EngineRunner;
+import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycleManager;
+import org.apache.oodt.cas.workflow.repository.WorkflowRepository;
+import org.apache.oodt.cas.workflow.structs.PrioritySorter;
+import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
+
+/**
+ *
+ * Constructs an instance of the {@link PrioritizedQueueBasedWorkflowEngine},
+ * based on its constituent instance repository, workflow task prioritizer,
+ * workflow lifecycle, and engine runner.
+ *
+ * @author mattmann
+ * @version $Revision$
+ *
+ */
+public class PrioritizedQueueBasedWorkflowEngineFactory implements
+ WorkflowEngineFactory {
+
+ private static final String MODEL_REPO_FACTORY_PROPERTY = "workflow.repo.factory";
+
+ private static final String INSTANCE_REPO_FACTORY_PROPERTY = "workflow.engine.instanceRep.factory";
+
+ private static final String PRIORITIZER_CLASS_PROPERTY = "org.apache.oodt.cas.workflow.wengine.prioritizer";
+
+ private static final String LIFECYCLES_FILE_PATH_PROPERTY = "org.apache.oodt.cas.workflow.lifecycle.filePath";
+
+ private static final String ENGINE_RUNNER_CLASS = "workflow.wengine.runner.factory";
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.WorkflowEngineFactory#createWorkflowEngine
+ * ()
+ */
+ @Override
+ public WorkflowEngine createWorkflowEngine() {
+ try {
+ return new PrioritizedQueueBasedWorkflowEngine(
+ getWorkflowInstanceRepository(), getPrioritizer(),
+ getWorkflowLifecycle(), getEngineRunner(), getModelRepository());
+ } catch (Exception e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+ protected WorkflowRepository getModelRepository() {
+ return GenericWorkflowObjectFactory
+ .getWorkflowRepositoryFromClassName(System
+ .getProperty(MODEL_REPO_FACTORY_PROPERTY));
+ }
+
+ protected EngineRunner getEngineRunner() {
+ return GenericWorkflowObjectFactory.getEngineRunnerFromClassName(System
+ .getProperty(ENGINE_RUNNER_CLASS));
+ }
+
+ protected WorkflowLifecycleManager getWorkflowLifecycle()
+ throws InstantiationException {
+ return new WorkflowLifecycleManager(PathUtils.replaceEnvVariables(System
+ .getProperty(LIFECYCLES_FILE_PATH_PROPERTY)));
+ }
+
+ protected PrioritySorter getPrioritizer() {
+ return GenericWorkflowObjectFactory.getPrioritySorterFromClassName(System
+ .getProperty(PRIORITIZER_CLASS_PROPERTY));
+
+ }
+
+ protected WorkflowInstanceRepository getWorkflowInstanceRepository() {
+ return GenericWorkflowObjectFactory
+ .getWorkflowInstanceRepositoryFromClassName(System
+ .getProperty(INSTANCE_REPO_FACTORY_PROPERTY));
+ }
+
+}
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java?rev=1377737&r1=1377736&r2=1377737&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java Mon Aug 27 16:42:09 2012
@@ -27,7 +27,9 @@ import java.util.logging.Logger;
import org.apache.oodt.cas.workflow.engine.processor.TaskProcessor;
import org.apache.oodt.cas.workflow.engine.processor.WorkflowProcessor;
import org.apache.oodt.cas.workflow.engine.processor.WorkflowProcessorQueue;
+import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycle;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowState;
import org.apache.oodt.cas.workflow.structs.PrioritySorter;
/**
@@ -51,8 +53,11 @@ public class TaskQuerier implements Runn
private List<WorkflowProcessor> runnableProcessors;
private PrioritySorter prioritizer;
-
- private static final Logger LOG = Logger.getLogger(TaskQuerier.class.getName());
+
+ private WorkflowInstanceRepository repo;
+
+ private static final Logger LOG = Logger.getLogger(TaskQuerier.class
+ .getName());
/**
* Constructs a new TaskQuerier with the given {@link WorkflowProcessorQueue},
@@ -63,13 +68,18 @@ public class TaskQuerier implements Runn
* The associated set of queued Workflow Tasks.
* @param prioritizer
* The prioritizer to use to sort the ready-to-run Workflow Tasks.
+ *
+ * @param repo
+ * The {@link WorkflowInstanceRepository} to save the state of
+ * WorkflowInstances.
*/
public TaskQuerier(WorkflowProcessorQueue processorQueue,
- PrioritySorter prioritizer) {
+ PrioritySorter prioritizer, WorkflowInstanceRepository repo) {
this.running = true;
this.processorQueue = processorQueue;
this.runnableProcessors = new Vector<WorkflowProcessor>();
this.prioritizer = prioritizer;
+ this.repo = repo;
}
/**
@@ -93,18 +103,32 @@ public class TaskQuerier implements Runn
WorkflowLifecycle lifecycle = getLifecycleForProcessor(processor);
if (!(processor.getState().getCategory().getName().equals("done") || processor
.getState().getCategory().getName().equals("holding"))) {
- for (TaskProcessor tp : processor.getRunnableWorkflowProcessors()) {
- tp.setState(lifecycle.createState("Executing", "running",
- "Added to Runnable queue"));
- LOG.log(Level.INFO, "Added processor with priority: ["+tp.getPriority()+"]");
- processorsToRun.add(tp);
- }
-
- prioritizer.sort(processorsToRun);
-
- synchronized(runnableProcessors){
- if(running) runnableProcessors = processorsToRun;
+ for (TaskProcessor tp : processor.getRunnableWorkflowProcessors()) {
+ WorkflowState state = lifecycle.createState("Executing", "running",
+ "Added to Runnable queue");
+ tp.setState(state);
+ tp.getWorkflowInstance().setState(state);
+ if (this.repo != null) {
+ try {
+ this.repo.updateWorkflowInstance(tp.getWorkflowInstance());
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOG.log(Level.WARNING, "Unable to update workflow instance: ["
+ + tp.getWorkflowInstance().getId()
+ + "] status to Executing. Message: " + e.getMessage());
+ }
}
+ LOG.log(Level.INFO,
+ "Added processor with priority: [" + tp.getPriority() + "]");
+ processorsToRun.add(tp);
+ }
+
+ prioritizer.sort(processorsToRun);
+
+ synchronized (runnableProcessors) {
+ if (running)
+ runnableProcessors = processorsToRun;
+ }
} else {
continue;
@@ -134,18 +158,19 @@ public class TaskQuerier implements Runn
public List<WorkflowProcessor> getRunnableProcessors() {
return runnableProcessors;
}
-
+
/**
- * Gets the next available {@link TaskProcessor} from the {@link List}
- * of {@link #runnableProcessors}. Removes that {@link TaskProcessor}
- * from the actual {@link #runnableProcessors} {@link List}.
+ * Gets the next available {@link TaskProcessor} from the {@link List} of
+ * {@link #runnableProcessors}. Removes that {@link TaskProcessor} from the
+ * actual {@link #runnableProcessors} {@link List}.
*
- * @return The next available {@link TaskProcessor} from the {@link List}
- * of {@link #runnableProcessors}.
+ * @return The next available {@link TaskProcessor} from the {@link List} of
+ * {@link #runnableProcessors}.
*/
- public TaskProcessor getNext(){
- if(getRunnableProcessors().size() == 0) return null;
- return (TaskProcessor)getRunnableProcessors().remove(0);
+ public TaskProcessor getNext() {
+ if (getRunnableProcessors().size() == 0)
+ return null;
+ return (TaskProcessor) getRunnableProcessors().remove(0);
}
private WorkflowLifecycle getLifecycleForProcessor(WorkflowProcessor processor) {
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessorQueue.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessorQueue.java?rev=1377737&r1=1377736&r2=1377737&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessorQueue.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessorQueue.java Mon Aug 27 16:42:09 2012
@@ -19,8 +19,26 @@ package org.apache.oodt.cas.workflow.eng
//JDK imports
import java.util.List;
+import java.util.UUID;
+import java.util.Vector;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+//OODT imports
+import org.apache.oodt.cas.metadata.Metadata;
import org.apache.oodt.cas.workflow.engine.TaskQuerier;
+import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycle;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycleManager;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowState;
+import org.apache.oodt.cas.workflow.repository.WorkflowRepository;
+import org.apache.oodt.cas.workflow.structs.Graph;
+import org.apache.oodt.cas.workflow.structs.ParentChildWorkflow;
+import org.apache.oodt.cas.workflow.structs.Workflow;
+import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
+import org.apache.oodt.cas.workflow.structs.WorkflowInstancePage;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+import org.apache.oodt.cas.workflow.structs.exceptions.RepositoryException;
/**
*
@@ -33,13 +51,125 @@ import org.apache.oodt.cas.workflow.engi
*/
public class WorkflowProcessorQueue {
+ private static final Logger LOG = Logger
+ .getLogger(WorkflowProcessorQueue.class.getName());
+
+ private WorkflowInstanceRepository repo;
+
+ private WorkflowRepository modelRepo;
+
+ private WorkflowLifecycleManager lifecycle;
+
+ public WorkflowProcessorQueue(WorkflowInstanceRepository repo,
+ WorkflowLifecycleManager lifecycle, WorkflowRepository modelRepo) {
+ this.repo = repo;
+ this.lifecycle = lifecycle;
+ this.modelRepo = modelRepo;
+ }
+
/**
* Should return the list of available, Queued, {@link WorkflowProcessor}s.
*
* @return the list of available, Queued, {@link WorkflowProcessor}s.
*/
public synchronized List<WorkflowProcessor> getProcessors() {
- return null;
+ WorkflowInstancePage page = null;
+ try {
+ page = repo.getPagedWorkflows(1, "Queued");
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOG.log(Level.WARNING, "Unable to load workflow processors: Message: "
+ + e.getMessage());
+ return null;
+ }
+
+ List<WorkflowProcessor> processors = new Vector<WorkflowProcessor>(
+ page.getPageSize());
+ for (WorkflowInstance inst : (List<WorkflowInstance>) (List<?>) page
+ .getPageWorkflows()) {
+ if ((inst.getState() == null)
+ || (inst.getState() != null && inst.getState().getCategory() == null)) {
+ WorkflowLifecycle cycle = getLifecycle(inst.getWorkflow());
+ System.out.println("I AM SETTING the state to get state by name: ["
+ + inst.getStatus() + "]");
+ WorkflowState state = cycle.getStateByName(inst.getStatus());
+ state.setMessage("Queued by WorkflowProcessorQueue.");
+ inst.setState(state);
+ try {
+ this.repo.updateWorkflowInstance(inst);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOG.log(
+ Level.WARNING,
+ "Unable to update workflow instance: [" + inst.getId()
+ + "] with status: [" + inst.getStatus() + "]: Message: "
+ + e.getMessage());
+ }
+
+ } else {
+ System.out.println("NOT CHANGING state: info for inst: "
+ + inst.getState());
+ }
+ processors.add(fromWorkflowInstance(inst));
+ }
+
+ return processors;
+ }
+
+ private WorkflowProcessor fromWorkflowInstance(WorkflowInstance inst) {
+ WorkflowProcessor processor = null;
+ if (inst.getParentChildWorkflow().getTasks() != null
+ && inst.getParentChildWorkflow().getTasks().size() > 1) {
+ processor = new SequentialProcessor();
+ processor.setExecutionType("sequential");
+ processor.setWorkflowInstance(inst);
+
+ for (WorkflowTask task : inst.getParentChildWorkflow().getTasks()) {
+ WorkflowInstance instance = new WorkflowInstance();
+ instance.setState(inst.getState());
+ instance.setCurrentTaskId(task.getTaskId());
+ ParentChildWorkflow workflow = new ParentChildWorkflow(new Graph());
+ workflow.setId("task-workflow-"+UUID.randomUUID().toString());
+ workflow.setName("Task Workflow-"+task.getTaskName());
+ workflow.getTasks().add(task);
+ workflow.getGraph().setTask(task);
+ instance.setId(UUID.randomUUID().toString());
+ instance.setParentChildWorkflow(workflow);
+ if(modelRepo != null){
+ try {
+ modelRepo.addWorkflow(workflow);
+ } catch (RepositoryException e) {
+ e.printStackTrace();
+ }
+ }
+
+ WorkflowProcessor subProcessor = fromWorkflowInstance(instance);
+ processor.getSubProcessors().add(subProcessor);
+ }
+ processor.setState(inst.getState());
+ }
+ else{
+ processor = new TaskProcessor();
+ processor.setExecutionType("task");
+ processor.setWorkflowInstance(inst);
+ processor.setState(inst.getState());
+ }
+
+ processor.setConditionProcessor(false);
+ processor.setDynamicMetadata(inst.getSharedContext());
+ processor.setPriority(inst.getPriority());
+ System.out.println("processor state: [" + processor.getState() + "]");
+ ProcessorDateTimeInfo dateTimeInfo = new ProcessorDateTimeInfo();
+ processor.setProcessorDateTimeInfo(dateTimeInfo);
+ processor.setStaticMetadata(new Metadata());
+ processor.setLifecycleManager(lifecycle);
+
+ return processor;
+ }
+
+ private WorkflowLifecycle getLifecycle(Workflow workflow) {
+ return lifecycle.getLifecycleForWorkflow(workflow) != null ? lifecycle
+ .getLifecycleForWorkflow(workflow) : lifecycle.getDefaultLifecycle();
}
}
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java?rev=1377737&r1=1377736&r2=1377737&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java Mon Aug 27 16:42:09 2012
@@ -20,6 +20,8 @@ package org.apache.oodt.cas.workflow.ins
//OODT imports
import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycleStage;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowState;
import org.apache.oodt.cas.workflow.structs.Priority;
import org.apache.oodt.cas.workflow.structs.Workflow;
import org.apache.oodt.cas.workflow.structs.WorkflowCondition;
@@ -31,6 +33,7 @@ import org.apache.oodt.cas.workflow.stru
//JDK imports
import java.io.File;
import java.io.IOException;
+import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
@@ -479,8 +482,32 @@ public class LuceneWorkflowInstanceRepos
// store the workflow instance info first
doc.add(new Field("workflow_inst_id", workflowInst.getId(),
Field.Store.YES, Field.Index.UN_TOKENIZED));
+
+ // will leave this for back compat, but will also store
+ // category
doc.add(new Field("workflow_inst_status", workflowInst.getStatus(),
Field.Store.YES, Field.Index.UN_TOKENIZED));
+
+ if(workflowInst.getState() != null){
+ WorkflowState state = workflowInst.getState();
+
+ if(state.getDescription() != null){
+ doc.add(new Field("workflow_inst_state_desc",
+ state.getDescription(), Field.Store.YES, Field.Index.UN_TOKENIZED));
+ }
+
+ if(state.getMessage() != null){
+ doc.add(new Field("workflow_inst_state_message",
+ state.getMessage(), Field.Store.YES, Field.Index.UN_TOKENIZED));
+ }
+
+ if(state.getCategory() != null && state.getCategory().getName() != null){
+ doc.add(new Field("workflow_inst_state_category",
+ state.getCategory().getName(), Field.Store.YES, Field.Index.UN_TOKENIZED));
+ System.out.println("Indexing category: ["+state.getCategory().getName()+"]");
+ }
+ }
+
doc
.add(new Field("workflow_inst_current_task_id", workflowInst
.getCurrentTaskId(), Field.Store.YES,
@@ -623,7 +650,25 @@ public class LuceneWorkflowInstanceRepos
// first read all the instance info
inst.setId(doc.get("workflow_inst_id"));
- inst.setStatus(doc.get("workflow_inst_status"));
+
+ // try and construct a state
+ WorkflowState state = new WorkflowState();
+ state.setName(doc.get("workflow_inst_status"));
+ if(doc.get("workflow_inst_state_category") != null){
+ WorkflowLifecycleStage category = new WorkflowLifecycleStage();
+ category.setName(doc.get("workflow_inst_state_category"));
+ System.out.println("unserializing category: ["+category.getName()+"]");
+ state.setCategory(category);
+ }
+
+ if(doc.get("workflow_inst_state_desc") != null){
+ state.setDescription(doc.get("workflow_inst_state_desc"));
+ }
+
+ if(doc.get("workflow_inst_state_message") != null){
+ state.setMessage(doc.get("workflow_inst_state_message"));
+ }
+ inst.setState(state);
inst.setCurrentTaskId(doc.get("workflow_inst_current_task_id"));
inst.setCurrentTaskStartDateTimeIsoStr(doc
.get("workflow_inst_currenttask_startdatetime"));
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/lifecycle/WorkflowLifecycle.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/lifecycle/WorkflowLifecycle.java?rev=1377737&r1=1377736&r2=1377737&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/lifecycle/WorkflowLifecycle.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/lifecycle/WorkflowLifecycle.java Mon Aug 27 16:42:09 2012
@@ -227,7 +227,7 @@ public class WorkflowLifecycle {
if (stage.getStates() != null) {
for (WorkflowState state : (List<WorkflowState>) stage.getStates()) {
if (state.getName().equals(stateName)) {
- return state;
+ return makeCopy(state);
}
}
}
@@ -257,4 +257,17 @@ public class WorkflowLifecycle {
return state;
}
+ private WorkflowState makeCopy(WorkflowState state){
+ WorkflowState newState = new WorkflowState();
+ newState.setCategory(state.getCategory());
+ newState.setDescription(state.getDescription());
+ newState.setMessage(state.getMessage());
+ newState.setName(state.getName());
+ newState.setPrevState(state.getPrevState());
+ newState.setStartTime(state.getStartTime());
+ newState.setSubStates(state.getSubStates());
+ return newState;
+ }
+
+
}
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/structs/ParentChildWorkflow.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/structs/ParentChildWorkflow.java?rev=1377737&r1=1377736&r2=1377737&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/structs/ParentChildWorkflow.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/structs/ParentChildWorkflow.java Mon Aug 27 16:42:09 2012
@@ -40,6 +40,7 @@ public class ParentChildWorkflow extends
}
public ParentChildWorkflow(Graph graph) {
+ super();
this.graph = graph;
}
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/util/GenericWorkflowObjectFactory.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/util/GenericWorkflowObjectFactory.java?rev=1377737&r1=1377736&r2=1377737&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/util/GenericWorkflowObjectFactory.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/util/GenericWorkflowObjectFactory.java Mon Aug 27 16:42:09 2012
@@ -25,6 +25,7 @@ import org.apache.oodt.cas.workflow.inst
import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepositoryFactory;
import org.apache.oodt.cas.workflow.repository.WorkflowRepository;
import org.apache.oodt.cas.workflow.repository.WorkflowRepositoryFactory;
+import org.apache.oodt.cas.workflow.structs.PrioritySorter;
import org.apache.oodt.cas.workflow.structs.WorkflowCondition;
import org.apache.oodt.cas.workflow.structs.WorkflowTask;
import org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance;
@@ -339,6 +340,34 @@ public final class GenericWorkflowObject
} else
return null;
}
+
+ public static PrioritySorter getPrioritySorterFromClassName(String className){
+ if(className != null){
+ try{
+ Class<PrioritySorter> sorterClass = (Class<PrioritySorter>)Class.forName(className);
+ return sorterClass.newInstance();
+ }
+ catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ LOG.log(Level.WARNING, "Unable to locate workflow prioritizer class: "
+ + className + ": cannot instantiate!");
+ return null;
+ } catch (InstantiationException e) {
+ e.printStackTrace();
+ LOG.log(Level.WARNING,
+ "Unable to instantiate workflow prioritizer class: " + className
+ + ": Reason: " + e.getMessage() + " !");
+ return null;
+ } catch (IllegalAccessException e) {
+ e.printStackTrace();
+ LOG.log(Level.WARNING,
+ "IllegalAccessException when instantiating workflow prioritizer class: "
+ + className + ": cannot instantiate!");
+ return null;
+ }
+ }
+ else return null;
+ }
public static List copyWorkflows(List workflows){
if(workflows != null){
Modified: oodt/trunk/workflow/src/main/resources/workflow.properties
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/resources/workflow.properties?rev=1377737&r1=1377736&r2=1377737&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/resources/workflow.properties (original)
+++ oodt/trunk/workflow/src/main/resources/workflow.properties Mon Aug 27 16:42:09 2012
@@ -24,6 +24,9 @@ workflow.engine.factory = org.apache.ood
# workflow instance repository factory
workflow.engine.instanceRep.factory = org.apache.oodt.cas.workflow.instrepo.LuceneWorkflowInstanceRepositoryFactory
+# engine runner factory
+workflow.wengine.runner.factory=org.apache.oodt.cas.workflow.engine.runner.AsynchronousLocalEngineRunnerFactory
+
# thread pool workflow engine properties
org.apache.oodt.cas.workflow.engine.queueSize=
org.apache.oodt.cas.workflow.engine.maxPoolSize=
@@ -32,6 +35,13 @@ org.apache.oodt.cas.workflow.engine.thre
org.apache.oodt.cas.workflow.engine.unlimitedQueue=true
org.apache.oodt.cas.workflow.engine.preConditionWaitTime=10
+# wengine properties
+# define workflow prioritizer class to use for sorting workflow tasks
+org.apache.oodt.cas.workflow.wengine.prioritizer=org.apache.oodt.cas.workflow.structs.FILOPrioritySorter
+
+# the maximum number of threads to be used by the asynchronous engine runner
+org.apache.oodt.cas.workflow.engine.asynchronous.runner.num.threads=25
+
# set this if you want the workflow manager to submit jobs through the resource mgr
org.apache.oodt.cas.workflow.engine.resourcemgr.url=
@@ -68,3 +78,7 @@ org.apache.oodt.cas.workflow.repo.dataso
# Spring command line option and action store properties
org.apache.oodt.cas.cli.action.spring.config=src/main/resources/cmd-line-actions.xml
org.apache.oodt.cas.cli.option.spring.config=src/main/resources/cmd-line-options.xml
+
+# Workflow Lifecycle Manager
+org.apache.oodt.cas.workflow.lifecycle.filePath=/path/to/workflow-lifecycle.xml
+
Modified: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MetSetterTaskQuerier.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MetSetterTaskQuerier.java?rev=1377737&r1=1377736&r2=1377737&view=diff
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MetSetterTaskQuerier.java (original)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MetSetterTaskQuerier.java Mon Aug 27 16:42:09 2012
@@ -45,7 +45,7 @@ public class MetSetterTaskQuerier extend
*/
public MetSetterTaskQuerier(WorkflowProcessorQueue processorQueue,
PrioritySorter prioritizer) {
- super(processorQueue, prioritizer);
+ super(processorQueue, prioritizer, null);
}
/*
Modified: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java?rev=1377737&r1=1377736&r2=1377737&view=diff
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java (original)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java Mon Aug 27 16:42:09 2012
@@ -39,6 +39,7 @@ public class MockProcessorQueue extends
private boolean consumed;
public MockProcessorQueue() {
+ super(null,null,null);
this.utils = new QuerierAndRunnerUtils();
this.consumed = false;
}
Modified: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java?rev=1377737&r1=1377736&r2=1377737&view=diff
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java (original)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java Mon Aug 27 16:42:09 2012
@@ -50,7 +50,7 @@ public class TestTaskQuerier extends Tes
assertNotNull(queued = processorQueue.getProcessors());
assertEquals(3, queued.size());
processorQueue = new MockProcessorQueue();
- TaskQuerier querier = new TaskQuerier(processorQueue, prioritizer);
+ TaskQuerier querier = new TaskQuerier(processorQueue, prioritizer, null);
Thread querierThread = new Thread(querier);
querierThread.start();
List<WorkflowProcessor> runnables = null;
@@ -74,7 +74,7 @@ public class TestTaskQuerier extends Tes
assertNotNull(queued = processorQueue.getProcessors());
assertEquals(3, queued.size());
processorQueue = new MockProcessorQueue();
- TaskQuerier querier = new TaskQuerier(processorQueue, prioritizer);
+ TaskQuerier querier = new TaskQuerier(processorQueue, prioritizer, null);
Thread querierThread = new Thread(querier);
querierThread.start();
List<WorkflowProcessor> runnables = null;