You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by ma...@apache.org on 2012/08/27 18:42:10 UTC

svn commit: r1377737 - in /oodt/trunk/workflow/src: main/java/org/apache/oodt/cas/workflow/engine/ main/java/org/apache/oodt/cas/workflow/engine/processor/ main/java/org/apache/oodt/cas/workflow/instrepo/ main/java/org/apache/oodt/cas/workflow/lifecycl...

Author: mattmann
Date: Mon Aug 27 16:42:09 2012
New Revision: 1377737

URL: http://svn.apache.org/viewvc?rev=1377737&view=rev
Log:
- OODT-310 WIP:
  - can actually execute a workflow task now using the AsynchronousLocalEngineRunner
  - workflow states updating from Queued to Executed
  - states being persisted to the Workflow Engine factory
  - workflow properties updated with necessary properties and defaults to configure and execute
  - object factories creating prioritizer and engine runners
  

Added:
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngineFactory.java
Modified:
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessorQueue.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/lifecycle/WorkflowLifecycle.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/structs/ParentChildWorkflow.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/util/GenericWorkflowObjectFactory.java
    oodt/trunk/workflow/src/main/resources/workflow.properties
    oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MetSetterTaskQuerier.java
    oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java
    oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java?rev=1377737&r1=1377736&r2=1377737&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java Mon Aug 27 16:42:09 2012
@@ -17,29 +17,29 @@
 
 package org.apache.oodt.cas.workflow.engine;
 
+//JDK imports
 import java.net.URL;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.Calendar;
 import java.util.UUID;
-import java.util.Vector;
 import java.util.logging.Logger;
 
+//OODT imports
 import org.apache.oodt.cas.metadata.Metadata;
-import org.apache.oodt.cas.workflow.engine.processor.WorkflowProcessor;
 import org.apache.oodt.cas.workflow.engine.processor.WorkflowProcessorQueue;
 import org.apache.oodt.cas.workflow.engine.runner.EngineRunner;
 import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycle;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycleManager;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowState;
+import org.apache.oodt.cas.workflow.repository.WorkflowRepository;
 import org.apache.oodt.cas.workflow.structs.HighestFIFOPrioritySorter;
+import org.apache.oodt.cas.workflow.structs.ParentChildWorkflow;
+import org.apache.oodt.cas.workflow.structs.Priority;
 import org.apache.oodt.cas.workflow.structs.PrioritySorter;
 import org.apache.oodt.cas.workflow.structs.Workflow;
 import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
-import org.apache.oodt.cas.workflow.structs.WorkflowStatus;
-import org.apache.oodt.cas.workflow.structs.WorkflowTask;
 import org.apache.oodt.cas.workflow.structs.exceptions.EngineException;
-import org.apache.oodt.commons.util.DateConvert;
+import org.apache.oodt.cas.workflow.structs.exceptions.InstanceRepositoryException;
 
 /**
  * 
@@ -57,24 +57,26 @@ public class PrioritizedQueueBasedWorkfl
   private final Thread queuerThread;
   private final Thread runnerThread;
   private final WorkflowInstanceRepository repo;
+  private final WorkflowRepository modelRepo;
+  private final WorkflowLifecycleManager lifecycle;
   private final PrioritySorter prioritizer;
   private WorkflowProcessorQueue processorQueue;
-  private final URL wmgrUrl;
-  private final long conditionWait;
+  private URL wmgrUrl;
   private EngineRunner runner;
 
   public PrioritizedQueueBasedWorkflowEngine(WorkflowInstanceRepository repo,
-      PrioritySorter prioritizer, long conditionWait) {
+      PrioritySorter prioritizer, WorkflowLifecycleManager lifecycle, EngineRunner runner, WorkflowRepository modelRepo) {
     this.repo = repo;
     this.prioritizer = prioritizer != null ? new HighestFIFOPrioritySorter(1,
         50, 1) : prioritizer;
-    this.wmgrUrl = null;
-    this.conditionWait = conditionWait;
-    this.processorQueue = new WorkflowProcessorQueue();
+    this.lifecycle = lifecycle;
+    this.modelRepo = modelRepo;
+    this.processorQueue = new WorkflowProcessorQueue(repo, lifecycle, modelRepo);
+    this.runner = runner;
 
 
     // Task QUEUER thread
-    TaskQuerier querier = new TaskQuerier(processorQueue, this.prioritizer);
+    TaskQuerier querier = new TaskQuerier(processorQueue, this.prioritizer, this.repo);
     queuerThread = new Thread(querier);
     queuerThread.start();
 
@@ -106,9 +108,31 @@ public class PrioritizedQueueBasedWorkfl
     // create a new WorkflowProcessor around it
     // set it in Queued status
     // commit it to workflow instance repo and it will get picked up 
-    // by the runner thread
+        
+    WorkflowInstance inst = new WorkflowInstance();
+    inst.setParentChildWorkflow(workflow instanceof ParentChildWorkflow ? 
+        (ParentChildWorkflow)workflow:new ParentChildWorkflow(workflow));
+    inst.setStartDate(Calendar.getInstance().getTime());
+    inst.setCurrentTaskId(workflow.getTasks().get(0).getTaskId());
+    inst.setId(UUID.randomUUID().toString());
+    inst.setSharedContext(metadata);
+    inst.setPriority(Priority.getDefault());
+    WorkflowLifecycle cycle = 
+      lifecycle.getLifecycleForWorkflow(workflow) != null ? 
+          lifecycle.getLifecycleForWorkflow(workflow):
+            lifecycle.getDefaultLifecycle();
+    WorkflowState state = cycle.getStateByName("Queued");
+    state.setMessage("Workflow started and Queued.");
+    inst.setState(state);  
+    System.out.println("CATEGORY NAME: ["+inst.getState().getCategory().getName()+"]");
+    try {
+      this.repo.addWorkflowInstance(inst);
+    } catch (InstanceRepositoryException e) {
+      e.printStackTrace();
+      throw new EngineException(e.getMessage());
+    }
     
-    return null;
+    return inst;
   }
 
   /*
@@ -183,7 +207,7 @@ public class PrioritizedQueueBasedWorkfl
    */
   @Override
   public void setWorkflowManagerUrl(URL url) {
-    // TODO Auto-generated method stub
+    this.wmgrUrl = url;
 
   }
 

Added: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngineFactory.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngineFactory.java?rev=1377737&view=auto
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngineFactory.java (added)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngineFactory.java Mon Aug 27 16:42:09 2012
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//OODT imports
+import org.apache.oodt.cas.metadata.util.PathUtils;
+import org.apache.oodt.cas.workflow.engine.runner.EngineRunner;
+import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycleManager;
+import org.apache.oodt.cas.workflow.repository.WorkflowRepository;
+import org.apache.oodt.cas.workflow.structs.PrioritySorter;
+import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
+
+/**
+ * 
+ * Constructs an instance of the {@link PrioritizedQueueBasedWorkflowEngine},
+ * based on its constituent instance repository, workflow task prioritizer,
+ * workflow lifecycle, and engine runner.
+ * 
+ * @author mattmann
+ * @version $Revision$
+ * 
+ */
+public class PrioritizedQueueBasedWorkflowEngineFactory implements
+    WorkflowEngineFactory {
+
+  private static final String MODEL_REPO_FACTORY_PROPERTY = "workflow.repo.factory";
+
+  private static final String INSTANCE_REPO_FACTORY_PROPERTY = "workflow.engine.instanceRep.factory";
+
+  private static final String PRIORITIZER_CLASS_PROPERTY = "org.apache.oodt.cas.workflow.wengine.prioritizer";
+
+  private static final String LIFECYCLES_FILE_PATH_PROPERTY = "org.apache.oodt.cas.workflow.lifecycle.filePath";
+
+  private static final String ENGINE_RUNNER_CLASS = "workflow.wengine.runner.factory";
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.WorkflowEngineFactory#createWorkflowEngine
+   * ()
+   */
+  @Override
+  public WorkflowEngine createWorkflowEngine() {
+    try {
+      return new PrioritizedQueueBasedWorkflowEngine(
+          getWorkflowInstanceRepository(), getPrioritizer(),
+          getWorkflowLifecycle(), getEngineRunner(), getModelRepository());
+    } catch (Exception e) {
+      e.printStackTrace();
+      return null;
+    }
+  }
+
+  protected WorkflowRepository getModelRepository() {
+    return GenericWorkflowObjectFactory
+        .getWorkflowRepositoryFromClassName(System
+            .getProperty(MODEL_REPO_FACTORY_PROPERTY));
+  }
+
+  protected EngineRunner getEngineRunner() {
+    return GenericWorkflowObjectFactory.getEngineRunnerFromClassName(System
+        .getProperty(ENGINE_RUNNER_CLASS));
+  }
+
+  protected WorkflowLifecycleManager getWorkflowLifecycle()
+      throws InstantiationException {
+    return new WorkflowLifecycleManager(PathUtils.replaceEnvVariables(System
+        .getProperty(LIFECYCLES_FILE_PATH_PROPERTY)));
+  }
+
+  protected PrioritySorter getPrioritizer() {
+    return GenericWorkflowObjectFactory.getPrioritySorterFromClassName(System
+        .getProperty(PRIORITIZER_CLASS_PROPERTY));
+
+  }
+
+  protected WorkflowInstanceRepository getWorkflowInstanceRepository() {
+    return GenericWorkflowObjectFactory
+        .getWorkflowInstanceRepositoryFromClassName(System
+            .getProperty(INSTANCE_REPO_FACTORY_PROPERTY));
+  }
+
+}

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java?rev=1377737&r1=1377736&r2=1377737&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java Mon Aug 27 16:42:09 2012
@@ -27,7 +27,9 @@ import java.util.logging.Logger;
 import org.apache.oodt.cas.workflow.engine.processor.TaskProcessor;
 import org.apache.oodt.cas.workflow.engine.processor.WorkflowProcessor;
 import org.apache.oodt.cas.workflow.engine.processor.WorkflowProcessorQueue;
+import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
 import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycle;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowState;
 import org.apache.oodt.cas.workflow.structs.PrioritySorter;
 
 /**
@@ -51,8 +53,11 @@ public class TaskQuerier implements Runn
   private List<WorkflowProcessor> runnableProcessors;
 
   private PrioritySorter prioritizer;
-  
-  private static final Logger LOG = Logger.getLogger(TaskQuerier.class.getName());
+
+  private WorkflowInstanceRepository repo;
+
+  private static final Logger LOG = Logger.getLogger(TaskQuerier.class
+      .getName());
 
   /**
    * Constructs a new TaskQuerier with the given {@link WorkflowProcessorQueue},
@@ -63,13 +68,18 @@ public class TaskQuerier implements Runn
    *          The associated set of queued Workflow Tasks.
    * @param prioritizer
    *          The prioritizer to use to sort the ready-to-run Workflow Tasks.
+   * 
+   * @param repo
+   *          The {@link WorkflowInstanceRepository} to save the state of
+   *          WorkflowInstances.
    */
   public TaskQuerier(WorkflowProcessorQueue processorQueue,
-      PrioritySorter prioritizer) {
+      PrioritySorter prioritizer, WorkflowInstanceRepository repo) {
     this.running = true;
     this.processorQueue = processorQueue;
     this.runnableProcessors = new Vector<WorkflowProcessor>();
     this.prioritizer = prioritizer;
+    this.repo = repo;
   }
 
   /**
@@ -93,18 +103,32 @@ public class TaskQuerier implements Runn
         WorkflowLifecycle lifecycle = getLifecycleForProcessor(processor);
         if (!(processor.getState().getCategory().getName().equals("done") || processor
             .getState().getCategory().getName().equals("holding"))) {
-            for (TaskProcessor tp : processor.getRunnableWorkflowProcessors()) {
-              tp.setState(lifecycle.createState("Executing", "running",
-                  "Added to Runnable queue"));
-              LOG.log(Level.INFO, "Added processor with priority: ["+tp.getPriority()+"]");
-              processorsToRun.add(tp);
-            }
-            
-            prioritizer.sort(processorsToRun);
-            
-            synchronized(runnableProcessors){
-              if(running) runnableProcessors = processorsToRun;
+          for (TaskProcessor tp : processor.getRunnableWorkflowProcessors()) {
+            WorkflowState state = lifecycle.createState("Executing", "running",
+                "Added to Runnable queue");
+            tp.setState(state);
+            tp.getWorkflowInstance().setState(state);
+            if (this.repo != null) {
+              try {
+                this.repo.updateWorkflowInstance(tp.getWorkflowInstance());
+              } catch (Exception e) {
+                e.printStackTrace();
+                LOG.log(Level.WARNING, "Unable to update workflow instance: ["
+                    + tp.getWorkflowInstance().getId()
+                    + "] status to Executing. Message: " + e.getMessage());
+              }
             }
+            LOG.log(Level.INFO,
+                "Added processor with priority: [" + tp.getPriority() + "]");
+            processorsToRun.add(tp);
+          }
+
+          prioritizer.sort(processorsToRun);
+
+          synchronized (runnableProcessors) {
+            if (running)
+              runnableProcessors = processorsToRun;
+          }
 
         } else {
           continue;
@@ -134,18 +158,19 @@ public class TaskQuerier implements Runn
   public List<WorkflowProcessor> getRunnableProcessors() {
     return runnableProcessors;
   }
-  
+
   /**
-   * Gets the next available {@link TaskProcessor} from the {@link List}
-   * of {@link #runnableProcessors}. Removes that {@link TaskProcessor}
-   * from the actual {@link #runnableProcessors} {@link List}.
+   * Gets the next available {@link TaskProcessor} from the {@link List} of
+   * {@link #runnableProcessors}. Removes that {@link TaskProcessor} from the
+   * actual {@link #runnableProcessors} {@link List}.
    * 
-   * @return The next available {@link TaskProcessor} from the {@link List}
-   * of {@link #runnableProcessors}.
+   * @return The next available {@link TaskProcessor} from the {@link List} of
+   *         {@link #runnableProcessors}.
    */
-  public TaskProcessor getNext(){
-    if(getRunnableProcessors().size() == 0) return null;
-    return (TaskProcessor)getRunnableProcessors().remove(0);
+  public TaskProcessor getNext() {
+    if (getRunnableProcessors().size() == 0)
+      return null;
+    return (TaskProcessor) getRunnableProcessors().remove(0);
   }
 
   private WorkflowLifecycle getLifecycleForProcessor(WorkflowProcessor processor) {

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessorQueue.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessorQueue.java?rev=1377737&r1=1377736&r2=1377737&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessorQueue.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/processor/WorkflowProcessorQueue.java Mon Aug 27 16:42:09 2012
@@ -19,8 +19,26 @@ package org.apache.oodt.cas.workflow.eng
 
 //JDK imports
 import java.util.List;
+import java.util.UUID;
+import java.util.Vector;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
+//OODT imports
+import org.apache.oodt.cas.metadata.Metadata;
 import org.apache.oodt.cas.workflow.engine.TaskQuerier;
+import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycle;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycleManager;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowState;
+import org.apache.oodt.cas.workflow.repository.WorkflowRepository;
+import org.apache.oodt.cas.workflow.structs.Graph;
+import org.apache.oodt.cas.workflow.structs.ParentChildWorkflow;
+import org.apache.oodt.cas.workflow.structs.Workflow;
+import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
+import org.apache.oodt.cas.workflow.structs.WorkflowInstancePage;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+import org.apache.oodt.cas.workflow.structs.exceptions.RepositoryException;
 
 /**
  * 
@@ -33,13 +51,125 @@ import org.apache.oodt.cas.workflow.engi
  */
 public class WorkflowProcessorQueue {
 
+  private static final Logger LOG = Logger
+      .getLogger(WorkflowProcessorQueue.class.getName());
+
+  private WorkflowInstanceRepository repo;
+  
+  private WorkflowRepository modelRepo;
+
+  private WorkflowLifecycleManager lifecycle;
+
+  public WorkflowProcessorQueue(WorkflowInstanceRepository repo,
+      WorkflowLifecycleManager lifecycle, WorkflowRepository modelRepo) {
+    this.repo = repo;
+    this.lifecycle = lifecycle;
+    this.modelRepo = modelRepo;
+  }
+
   /**
    * Should return the list of available, Queued, {@link WorkflowProcessor}s.
    * 
    * @return the list of available, Queued, {@link WorkflowProcessor}s.
    */
   public synchronized List<WorkflowProcessor> getProcessors() {
-    return null;
+    WorkflowInstancePage page = null;
+    try {
+      page = repo.getPagedWorkflows(1, "Queued");
+    } catch (Exception e) {
+      e.printStackTrace();
+      LOG.log(Level.WARNING, "Unable to load workflow processors: Message: "
+          + e.getMessage());
+      return null;
+    }
+
+    List<WorkflowProcessor> processors = new Vector<WorkflowProcessor>(
+        page.getPageSize());
+    for (WorkflowInstance inst : (List<WorkflowInstance>) (List<?>) page
+        .getPageWorkflows()) {
+      if ((inst.getState() == null)
+          || (inst.getState() != null && inst.getState().getCategory() == null)) {
+        WorkflowLifecycle cycle = getLifecycle(inst.getWorkflow());
+        System.out.println("I AM SETTING the state to get state by name: ["
+            + inst.getStatus() + "]");
+        WorkflowState state = cycle.getStateByName(inst.getStatus());
+        state.setMessage("Queued by WorkflowProcessorQueue.");
+        inst.setState(state);
+        try {
+          this.repo.updateWorkflowInstance(inst);
+        } catch (Exception e) {
+          e.printStackTrace();
+          LOG.log(
+              Level.WARNING,
+              "Unable to update workflow instance: [" + inst.getId()
+                  + "] with status: [" + inst.getStatus() + "]: Message: "
+                  + e.getMessage());
+        }
+
+      } else {
+        System.out.println("NOT CHANGING state: info for inst: "
+            + inst.getState());
+      }
+      processors.add(fromWorkflowInstance(inst));
+    }
+
+    return processors;
+  }
+
+  private WorkflowProcessor fromWorkflowInstance(WorkflowInstance inst) {
+    WorkflowProcessor processor = null;
+    if (inst.getParentChildWorkflow().getTasks() != null
+        && inst.getParentChildWorkflow().getTasks().size() > 1) {
+      processor = new SequentialProcessor();
+      processor.setExecutionType("sequential");
+      processor.setWorkflowInstance(inst); 
+      
+      for (WorkflowTask task : inst.getParentChildWorkflow().getTasks()) {
+        WorkflowInstance instance = new WorkflowInstance();
+        instance.setState(inst.getState());
+        instance.setCurrentTaskId(task.getTaskId());
+        ParentChildWorkflow workflow = new ParentChildWorkflow(new Graph());
+        workflow.setId("task-workflow-"+UUID.randomUUID().toString());
+        workflow.setName("Task Workflow-"+task.getTaskName());
+        workflow.getTasks().add(task);
+        workflow.getGraph().setTask(task);
+        instance.setId(UUID.randomUUID().toString());
+        instance.setParentChildWorkflow(workflow);
+        if(modelRepo != null){
+          try {
+            modelRepo.addWorkflow(workflow);
+          } catch (RepositoryException e) {
+            e.printStackTrace();
+          }
+        }
+        
+        WorkflowProcessor subProcessor = fromWorkflowInstance(instance);
+        processor.getSubProcessors().add(subProcessor);        
+      }      
+      processor.setState(inst.getState());
+    }
+    else{
+      processor = new TaskProcessor();
+      processor.setExecutionType("task");
+      processor.setWorkflowInstance(inst);
+      processor.setState(inst.getState());
+    }
+    
+    processor.setConditionProcessor(false);
+    processor.setDynamicMetadata(inst.getSharedContext());
+    processor.setPriority(inst.getPriority());
+    System.out.println("processor state: [" + processor.getState() + "]");
+    ProcessorDateTimeInfo dateTimeInfo = new ProcessorDateTimeInfo();
+    processor.setProcessorDateTimeInfo(dateTimeInfo);
+    processor.setStaticMetadata(new Metadata());
+    processor.setLifecycleManager(lifecycle);
+    
+    return processor;
+  }
+
+  private WorkflowLifecycle getLifecycle(Workflow workflow) {
+    return lifecycle.getLifecycleForWorkflow(workflow) != null ? lifecycle
+        .getLifecycleForWorkflow(workflow) : lifecycle.getDefaultLifecycle();
   }
 
 }

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java?rev=1377737&r1=1377736&r2=1377737&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java Mon Aug 27 16:42:09 2012
@@ -20,6 +20,8 @@ package org.apache.oodt.cas.workflow.ins
 
 //OODT imports
 import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycleStage;
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowState;
 import org.apache.oodt.cas.workflow.structs.Priority;
 import org.apache.oodt.cas.workflow.structs.Workflow;
 import org.apache.oodt.cas.workflow.structs.WorkflowCondition;
@@ -31,6 +33,7 @@ import org.apache.oodt.cas.workflow.stru
 //JDK imports
 import java.io.File;
 import java.io.IOException;
+import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Vector;
@@ -479,8 +482,32 @@ public class LuceneWorkflowInstanceRepos
         // store the workflow instance info first
         doc.add(new Field("workflow_inst_id", workflowInst.getId(),
                 Field.Store.YES, Field.Index.UN_TOKENIZED));
+        
+        // will leave this for back compat, but will also store 
+        // category 
         doc.add(new Field("workflow_inst_status", workflowInst.getStatus(),
                 Field.Store.YES, Field.Index.UN_TOKENIZED));
+        
+        if(workflowInst.getState() != null){
+          WorkflowState state = workflowInst.getState();
+        
+          if(state.getDescription() != null){
+            doc.add(new Field("workflow_inst_state_desc",
+                state.getDescription(), Field.Store.YES, Field.Index.UN_TOKENIZED));
+          }
+          
+          if(state.getMessage() != null){
+            doc.add(new Field("workflow_inst_state_message",
+                state.getMessage(), Field.Store.YES, Field.Index.UN_TOKENIZED));
+          }
+          
+          if(state.getCategory() != null && state.getCategory().getName() != null){
+            doc.add(new Field("workflow_inst_state_category",
+                state.getCategory().getName(), Field.Store.YES, Field.Index.UN_TOKENIZED));
+            System.out.println("Indexing category: ["+state.getCategory().getName()+"]");
+          }
+        }        
+        
         doc
                 .add(new Field("workflow_inst_current_task_id", workflowInst
                         .getCurrentTaskId(), Field.Store.YES,
@@ -623,7 +650,25 @@ public class LuceneWorkflowInstanceRepos
 
         // first read all the instance info
         inst.setId(doc.get("workflow_inst_id"));
-        inst.setStatus(doc.get("workflow_inst_status"));
+        
+        // try and construct a state
+        WorkflowState state = new WorkflowState();
+        state.setName(doc.get("workflow_inst_status"));
+        if(doc.get("workflow_inst_state_category") != null){
+          WorkflowLifecycleStage category = new WorkflowLifecycleStage();
+          category.setName(doc.get("workflow_inst_state_category"));
+          System.out.println("unserializing category: ["+category.getName()+"]");
+          state.setCategory(category);
+        }
+        
+        if(doc.get("workflow_inst_state_desc") != null){
+          state.setDescription(doc.get("workflow_inst_state_desc"));
+        }
+        
+        if(doc.get("workflow_inst_state_message") != null){
+          state.setMessage(doc.get("workflow_inst_state_message"));
+        }        
+        inst.setState(state);
         inst.setCurrentTaskId(doc.get("workflow_inst_current_task_id"));
         inst.setCurrentTaskStartDateTimeIsoStr(doc
                 .get("workflow_inst_currenttask_startdatetime"));

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/lifecycle/WorkflowLifecycle.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/lifecycle/WorkflowLifecycle.java?rev=1377737&r1=1377736&r2=1377737&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/lifecycle/WorkflowLifecycle.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/lifecycle/WorkflowLifecycle.java Mon Aug 27 16:42:09 2012
@@ -227,7 +227,7 @@ public class WorkflowLifecycle {
         if (stage.getStates() != null) {
           for (WorkflowState state : (List<WorkflowState>) stage.getStates()) {
             if (state.getName().equals(stateName)) {
-              return state;
+              return makeCopy(state);
             }
           }
         }
@@ -257,4 +257,17 @@ public class WorkflowLifecycle {
     return state;
   }
 
+  private WorkflowState makeCopy(WorkflowState state){
+    WorkflowState newState = new WorkflowState();
+    newState.setCategory(state.getCategory());
+    newState.setDescription(state.getDescription());
+    newState.setMessage(state.getMessage());
+    newState.setName(state.getName());
+    newState.setPrevState(state.getPrevState());
+    newState.setStartTime(state.getStartTime());
+    newState.setSubStates(state.getSubStates());    
+    return newState;
+  }
+  
+
 }

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/structs/ParentChildWorkflow.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/structs/ParentChildWorkflow.java?rev=1377737&r1=1377736&r2=1377737&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/structs/ParentChildWorkflow.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/structs/ParentChildWorkflow.java Mon Aug 27 16:42:09 2012
@@ -40,6 +40,7 @@ public class ParentChildWorkflow extends
   }
 
   public ParentChildWorkflow(Graph graph) {
+    super();
     this.graph = graph;
   }
 

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/util/GenericWorkflowObjectFactory.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/util/GenericWorkflowObjectFactory.java?rev=1377737&r1=1377736&r2=1377737&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/util/GenericWorkflowObjectFactory.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/util/GenericWorkflowObjectFactory.java Mon Aug 27 16:42:09 2012
@@ -25,6 +25,7 @@ import org.apache.oodt.cas.workflow.inst
 import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepositoryFactory;
 import org.apache.oodt.cas.workflow.repository.WorkflowRepository;
 import org.apache.oodt.cas.workflow.repository.WorkflowRepositoryFactory;
+import org.apache.oodt.cas.workflow.structs.PrioritySorter;
 import org.apache.oodt.cas.workflow.structs.WorkflowCondition;
 import org.apache.oodt.cas.workflow.structs.WorkflowTask;
 import org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance;
@@ -339,6 +340,34 @@ public final class GenericWorkflowObject
 		} else
 			return null;
 	}
+	
+	public static PrioritySorter getPrioritySorterFromClassName(String className){
+	  if(className != null){
+	    try{
+	      Class<PrioritySorter> sorterClass = (Class<PrioritySorter>)Class.forName(className);	      
+	      return sorterClass.newInstance();
+	    }
+	    catch (ClassNotFoundException e) {
+        e.printStackTrace();
+        LOG.log(Level.WARNING, "Unable to locate workflow prioritizer class: "
+            + className + ": cannot instantiate!");
+        return null;
+      } catch (InstantiationException e) {
+        e.printStackTrace();
+        LOG.log(Level.WARNING,
+            "Unable to instantiate workflow prioritizer class: " + className
+                + ": Reason: " + e.getMessage() + " !");
+        return null;
+      } catch (IllegalAccessException e) {
+        e.printStackTrace();
+        LOG.log(Level.WARNING,
+            "IllegalAccessException when instantiating workflow prioritizer class: "
+                + className + ": cannot instantiate!");
+        return null;
+      }
+	  }
+	  else return null;
+	}
 
 	public static List copyWorkflows(List workflows){
 		if(workflows != null){

Modified: oodt/trunk/workflow/src/main/resources/workflow.properties
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/resources/workflow.properties?rev=1377737&r1=1377736&r2=1377737&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/resources/workflow.properties (original)
+++ oodt/trunk/workflow/src/main/resources/workflow.properties Mon Aug 27 16:42:09 2012
@@ -24,6 +24,9 @@ workflow.engine.factory = org.apache.ood
 # workflow instance repository factory
 workflow.engine.instanceRep.factory = org.apache.oodt.cas.workflow.instrepo.LuceneWorkflowInstanceRepositoryFactory
 
+# engine runner factory
+workflow.wengine.runner.factory=org.apache.oodt.cas.workflow.engine.runner.AsynchronousLocalEngineRunnerFactory
+
 # thread pool workflow engine properties
 org.apache.oodt.cas.workflow.engine.queueSize=
 org.apache.oodt.cas.workflow.engine.maxPoolSize=
@@ -32,6 +35,13 @@ org.apache.oodt.cas.workflow.engine.thre
 org.apache.oodt.cas.workflow.engine.unlimitedQueue=true
 org.apache.oodt.cas.workflow.engine.preConditionWaitTime=10
 
+# wengine properties
+# define workflow prioritizer class to use for sorting workflow tasks
+org.apache.oodt.cas.workflow.wengine.prioritizer=org.apache.oodt.cas.workflow.structs.FILOPrioritySorter
+
+# the maximum number of threads to be used by the asynchronous engine runner
+org.apache.oodt.cas.workflow.engine.asynchronous.runner.num.threads=25
+
 # set this if you want the workflow manager to submit jobs through the resource mgr
 org.apache.oodt.cas.workflow.engine.resourcemgr.url=
 
@@ -68,3 +78,7 @@ org.apache.oodt.cas.workflow.repo.dataso
 # Spring command line option and action store properties
 org.apache.oodt.cas.cli.action.spring.config=src/main/resources/cmd-line-actions.xml
 org.apache.oodt.cas.cli.option.spring.config=src/main/resources/cmd-line-options.xml
+
+# Workflow Lifecycle Manager
+org.apache.oodt.cas.workflow.lifecycle.filePath=/path/to/workflow-lifecycle.xml
+

Modified: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MetSetterTaskQuerier.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MetSetterTaskQuerier.java?rev=1377737&r1=1377736&r2=1377737&view=diff
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MetSetterTaskQuerier.java (original)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MetSetterTaskQuerier.java Mon Aug 27 16:42:09 2012
@@ -45,7 +45,7 @@ public class MetSetterTaskQuerier extend
    */
   public MetSetterTaskQuerier(WorkflowProcessorQueue processorQueue,
       PrioritySorter prioritizer) {
-    super(processorQueue, prioritizer);
+    super(processorQueue, prioritizer, null);
   }
 
   /*

Modified: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java?rev=1377737&r1=1377736&r2=1377737&view=diff
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java (original)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java Mon Aug 27 16:42:09 2012
@@ -39,6 +39,7 @@ public class MockProcessorQueue extends 
   private boolean consumed;
 
   public MockProcessorQueue() {
+    super(null,null,null);
     this.utils = new QuerierAndRunnerUtils();
     this.consumed = false;
   }

Modified: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java?rev=1377737&r1=1377736&r2=1377737&view=diff
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java (original)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java Mon Aug 27 16:42:09 2012
@@ -50,7 +50,7 @@ public class TestTaskQuerier extends Tes
     assertNotNull(queued = processorQueue.getProcessors());
     assertEquals(3, queued.size());
     processorQueue = new MockProcessorQueue();
-    TaskQuerier querier = new TaskQuerier(processorQueue, prioritizer);
+    TaskQuerier querier = new TaskQuerier(processorQueue, prioritizer, null);
     Thread querierThread = new Thread(querier);
     querierThread.start();
     List<WorkflowProcessor> runnables = null;
@@ -74,7 +74,7 @@ public class TestTaskQuerier extends Tes
     assertNotNull(queued = processorQueue.getProcessors());
     assertEquals(3, queued.size());
     processorQueue = new MockProcessorQueue();
-    TaskQuerier querier = new TaskQuerier(processorQueue, prioritizer);
+    TaskQuerier querier = new TaskQuerier(processorQueue, prioritizer, null);
     Thread querierThread = new Thread(querier);
     querierThread.start();
     List<WorkflowProcessor> runnables = null;