You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by ma...@apache.org on 2012/02/21 22:17:29 UTC

svn commit: r1292037 - in /oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine: PrioritizedQueueBasedWorkflowEngine.java ThreadPoolWorkflowEngine.java

Author: mattmann
Date: Tue Feb 21 21:17:28 2012
New Revision: 1292037

URL: http://svn.apache.org/viewvc?rev=1292037&view=rev
Log:
- OODT-215/OODT-310 WIP: Port WEngine to trunk
  - add pqueue-based engine (the basis for wengine2)
     - just a skeleton
  - modifying thread pool wengine to work with OODT-381 runner framework
  

Added:
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
Modified:
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngine.java

Added: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java?rev=1292037&view=auto
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java (added)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java Tue Feb 21 21:17:28 2012
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+import java.net.URL;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+import java.util.Vector;
+
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
+import org.apache.oodt.cas.workflow.structs.PrioritySorter;
+import org.apache.oodt.cas.workflow.structs.Workflow;
+import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
+import org.apache.oodt.cas.workflow.structs.WorkflowStatus;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+import org.apache.oodt.cas.workflow.structs.exceptions.EngineException;
+import org.apache.oodt.commons.util.DateConvert;
+
+/**
+ * 
+ * Describe your class here.
+ * 
+ * @author mattmann
+ * @version $Revision$
+ * 
+ */
+public class PrioritizedQueueBasedWorkflowEngine implements WorkflowEngine {
+
+  private WorkflowInstanceRepository repo;
+  
+  private PrioritySorter prioritizer;
+  
+  private URL wmgrUrl;
+  
+  private long conditionWait;
+  
+  public PrioritizedQueueBasedWorkflowEngine(WorkflowInstanceRepository repo, PrioritySorter prioritizer, long conditionWait){
+    this.repo = repo;
+    this.prioritizer = prioritizer;
+    this.wmgrUrl = null;
+    this.conditionWait = conditionWait;
+  }
+  
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#startWorkflow(org.apache
+   * .oodt.cas.workflow.structs.Workflow, org.apache.oodt.cas.metadata.Metadata)
+   */
+  @Override
+  public WorkflowInstance startWorkflow(Workflow workflow, Metadata metadata)
+      throws EngineException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#stopWorkflow(java.lang
+   * .String)
+   */
+  @Override
+  public void stopWorkflow(String workflowInstId) {
+    // TODO Auto-generated method stub
+
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#pauseWorkflowInstance
+   * (java.lang.String)
+   */
+  @Override
+  public void pauseWorkflowInstance(String workflowInstId) {
+    // TODO Auto-generated method stub
+
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#resumeWorkflowInstance
+   * (java.lang.String)
+   */
+  @Override
+  public void resumeWorkflowInstance(String workflowInstId) {
+    // TODO Auto-generated method stub
+
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#getInstanceRepository()
+   */
+  @Override
+  public WorkflowInstanceRepository getInstanceRepository() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#updateMetadata(java.
+   * lang.String, org.apache.oodt.cas.metadata.Metadata)
+   */
+  @Override
+  public boolean updateMetadata(String workflowInstId, Metadata met) {
+    // TODO Auto-generated method stub
+    return false;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#setWorkflowManagerUrl
+   * (java.net.URL)
+   */
+  @Override
+  public void setWorkflowManagerUrl(URL url) {
+    // TODO Auto-generated method stub
+
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#getWallClockMinutes(
+   * java.lang.String)
+   */
+  @Override
+  public double getWallClockMinutes(String workflowInstId) {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#
+   * getCurrentTaskWallClockMinutes(java.lang.String)
+   */
+  @Override
+  public double getCurrentTaskWallClockMinutes(String workflowInstId) {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#getWorkflowInstanceMetadata
+   * (java.lang.String)
+   */
+  @Override
+  public Metadata getWorkflowInstanceMetadata(String workflowInstId) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+  
+  class QueueWorker implements Runnable{
+    
+    private boolean work;
+    
+    public QueueWorker(){
+      this.work = true;
+    }
+
+    /* (non-Javadoc)
+     * @see java.lang.Runnable#run()
+     */
+    public void run() {
+      while(work) {
+        try {
+          List<WorkflowInstance> instances = null;
+          synchronized(repo){
+            instances = repo.getWorkflowInstances();
+          }
+          
+          List<WorkflowProcessor> runnableProcessors = new Vector<WorkflowProcessor>();
+          for (WorkflowInstance instance : instances) {
+            if (!work)
+              break;
+            if (isRunnableInstance(instance)) {
+              synchronized(repo){
+                instance.setStatus(WorkflowStatus.STARTED);
+                repo.updateWorkflowInstance(instance);
+              }
+              
+              synchronized(runnableProcessors){
+                WorkflowInstance inst = new WorkflowInstance();
+                Workflow workflow = new Workflow();
+                workflow.setId(instance.getId()+"-"+instance.getCurrentTaskId());
+                WorkflowTask task = getTask(instance.getWorkflow().getTasks(), instance.getCurrentTaskId());
+                workflow.setName(task.getTaskName());
+                workflow.getTasks().add(task);
+                inst.setId(UUID.randomUUID().toString());
+                inst.setWorkflow(workflow);
+                inst.setCurrentTaskStartDateTimeIsoStr(DateConvert.isoFormat(new Date()));
+                inst.setPriority(instance.getPriority());
+                inst.setSharedContext(instance.getSharedContext());
+                
+                SequentialProcessor processor = 
+                  new SequentialProcessor(inst, repo, wmgrUrl, conditionWait);
+                runnableProcessors.add(processor);
+              }
+            }
+
+            prioritizer.sort(runnableProcessors);
+            
+            //take a breather
+            try {
+              synchronized(this) {
+                this.wait(1);
+              }
+            }catch (Exception e){}
+          }
+        }catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+      
+      try {
+        synchronized(this) {
+          this.wait(2000);
+        }
+      }catch (Exception e){}
+    }
+    
+    private WorkflowTask getTask(List<WorkflowTask> tasks, String id){
+      if(tasks != null && tasks.size() > 0){
+        for(WorkflowTask task: tasks){
+          if(task.getTaskId().equals(id)){
+            return task;
+          }
+        }
+      }
+      
+      return null;
+    }
+    
+    private boolean isRunnableInstance(WorkflowInstance instance){
+       return !instance.getStatus().equals(WorkflowStatus.ERROR) && 
+       !instance.getStatus().equals(WorkflowStatus.FINISHED) && 
+       !instance.getStatus().equals(WorkflowStatus.METADATA_MISSING) && 
+       !instance.getStatus().equals(WorkflowStatus.PAUSED);
+    }
+  }
+
+
+}

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngine.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngine.java?rev=1292037&r1=1292036&r2=1292037&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngine.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngine.java Tue Feb 21 21:17:28 2012
@@ -19,19 +19,27 @@ package org.apache.oodt.cas.workflow.eng
 
 //OODT imports
 import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.resource.structs.Job;
+import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
 import org.apache.oodt.cas.resource.system.XmlRpcResourceManagerClient;
+import org.apache.oodt.cas.workflow.structs.TaskJobInput;
 import org.apache.oodt.cas.workflow.structs.Workflow;
 import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
 import org.apache.oodt.cas.workflow.structs.WorkflowStatus;
 import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+import org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance;
 import org.apache.oodt.cas.workflow.structs.exceptions.EngineException;
 import org.apache.oodt.cas.workflow.structs.exceptions.InstanceRepositoryException;
+import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
 import org.apache.oodt.cas.workflow.engine.SequentialProcessor;
 import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
+import org.apache.oodt.cas.workflow.metadata.CoreMetKeys;
 import org.apache.oodt.commons.util.DateConvert;
 
 //JDK imports
+import java.net.InetAddress;
 import java.net.URL;
+import java.net.UnknownHostException;
 import java.text.ParseException;
 import java.util.Date;
 import java.util.HashMap;
@@ -69,15 +77,16 @@ public class ThreadPoolWorkflowEngine im
   /* our instance repository */
   private WorkflowInstanceRepository instRep = null;
 
-  /* our resource manager client */
-  private XmlRpcResourceManagerClient rClient = null;
-
   /* the URL pointer to the parent Workflow Manager */
   private URL wmgrUrl = null;
 
   /* how long to wait before checking whether a condition is satisfied. */
   private long conditionWait;
 
+  private ConditionProcessor condProcessor;
+
+  private EngineRunner runner;
+
   /**
    * Default Constructor.
    * 
@@ -120,12 +129,17 @@ public class ThreadPoolWorkflowEngine im
 
     workerMap = new HashMap();
 
-    if (resUrl != null)
-      rClient = new XmlRpcResourceManagerClient(resUrl);
+    if (resUrl != null) {
+      this.runner = new ResourceRunner(resUrl);
+    } else {
+      this.runner = new AsynchronousLocalEngineRunner();
+    }
 
     this.conditionWait = Long.getLong(
         "org.apache.oodt.cas.workflow.engine.preConditionWaitTime", 10)
         .longValue();
+
+    this.condProcessor = new ConditionProcessor();
   }
 
   /*
@@ -137,7 +151,7 @@ public class ThreadPoolWorkflowEngine im
    */
   public synchronized void pauseWorkflowInstance(String workflowInstId) {
     // okay, try and look up that worker thread in our hash map
-    SequentialProcessor worker = ((ThreadedProcessor) workerMap
+    SequentialProcessor worker = ((ThreadedExecutor) workerMap
         .get(workflowInstId)).getProcessor();
     if (worker == null) {
       LOG.log(Level.WARNING,
@@ -161,7 +175,7 @@ public class ThreadPoolWorkflowEngine im
    */
   public synchronized void resumeWorkflowInstance(String workflowInstId) {
     // okay, try and look up that worker thread in our hash map
-    SequentialProcessor worker = ((ThreadedProcessor) workerMap
+    SequentialProcessor worker = ((ThreadedExecutor) workerMap
         .get(workflowInstId)).getProcessor();
     if (worker == null) {
       LOG.log(Level.WARNING,
@@ -173,7 +187,7 @@ public class ThreadPoolWorkflowEngine im
 
     // also check to make sure that the worker is currently paused
     // only can resume WorkflowInstances that are paused, right?
-    if (!worker.isPaused()) {
+    if (true/*!worker.isPaused()*/) {
       LOG.log(Level.WARNING,
           "WorkflowEngine: Attempt to resume a workflow that "
               + "isn't paused currently: instance id: " + workflowInstId);
@@ -208,16 +222,15 @@ public class ThreadPoolWorkflowEngine im
     wInst.setStatus(CREATED);
     persistWorkflowInstance(wInst);
 
-    SequentialProcessor worker = new SequentialProcessor(wInst,
-        instRep, this.wmgrUrl, this.conditionWait);
-    worker.setRClient(rClient);
+    SequentialProcessor worker = new SequentialProcessor(wInst, instRep,
+        this.wmgrUrl, this.conditionWait);
     workerMap.put(wInst.getId(), worker);
 
     wInst.setStatus(QUEUED);
     persistWorkflowInstance(wInst);
 
     try {
-      pool.execute(new ThreadedProcessor(worker));
+      pool.execute(new ThreadedExecutor(worker, this.condProcessor));
     } catch (InterruptedException e) {
       throw new EngineException(e);
     }
@@ -245,7 +258,7 @@ public class ThreadPoolWorkflowEngine im
    */
   public synchronized boolean updateMetadata(String workflowInstId, Metadata met) {
     // okay, try and look up that worker thread in our hash map
-    SequentialProcessor worker = ((ThreadedProcessor) workerMap
+    SequentialProcessor worker = ((ThreadedExecutor) workerMap
         .get(workflowInstId)).getProcessor();
     if (worker == null) {
       LOG.log(Level.WARNING,
@@ -290,7 +303,7 @@ public class ThreadPoolWorkflowEngine im
    */
   public synchronized void stopWorkflow(String workflowInstId) {
     // okay, try and look up that worker thread in our hash map
-    SequentialProcessor worker = ((ThreadedProcessor) workerMap
+    SequentialProcessor worker = ((ThreadedExecutor) workerMap
         .get(workflowInstId)).getProcessor();
     if (worker == null) {
       LOG.log(Level.WARNING,
@@ -324,7 +337,7 @@ public class ThreadPoolWorkflowEngine im
    */
   public Metadata getWorkflowInstanceMetadata(String workflowInstId) {
     // okay, try and look up that worker thread in our hash map
-    SequentialProcessor worker = ((ThreadedProcessor) workerMap
+    SequentialProcessor worker = ((ThreadedExecutor) workerMap
         .get(workflowInstId)).getProcessor();
     if (worker == null) {
       // try and get the metadata
@@ -477,12 +490,19 @@ public class ThreadPoolWorkflowEngine im
     }
   }
 
-  class ThreadedProcessor implements Runnable {
+  class ThreadedExecutor implements Runnable, CoreMetKeys {
 
     private SequentialProcessor processor;
 
-    public ThreadedProcessor(SequentialProcessor processor) {
+    private boolean running;
+
+    private ConditionProcessor conditionEvaluator;
+
+    public ThreadedExecutor(SequentialProcessor processor,
+        ConditionProcessor conditionEvaluator) {
       this.processor = processor;
+      this.running = false;
+      this.conditionEvaluator = conditionEvaluator;
     }
 
     /*
@@ -492,7 +512,90 @@ public class ThreadPoolWorkflowEngine im
      */
     @Override
     public void run() {
-      processor.start();
+      String startDateTimeIsoStr = DateConvert.isoFormat(new Date());
+      this.getProcessor().getWorkflowInstance()
+          .setStartDateTimeIsoStr(startDateTimeIsoStr);
+      this.getProcessor().persistWorkflowInstance();
+
+      while (running && this.getProcessor().getRunnableSubProcessors() != null
+          && this.getProcessor().getRunnableSubProcessors().size() > 0) {
+        if (isPaused()) {
+          LOG.log(
+              Level.FINE,
+              "SequentialProcessor: Skipping execution: Paused: CurrentTask: "
+                  + this.getProcessor().getTaskNameById(
+                      this.getProcessor().getCurrentTaskId()));
+          continue;
+        }
+
+        TaskProcessor taskProcessor = (TaskProcessor) this.processor
+            .getRunnableSubProcessors().get(0);
+        WorkflowTask task = taskProcessor.getTask();
+        this.getProcessor().getWorkflowInstance()
+            .setCurrentTaskId(task.getTaskId());
+
+        this.getProcessor().persistWorkflowInstance();
+        if (!taskProcessor.checkTaskRequiredMetadata(this.getProcessor()
+            .getWorkflowInstance().getSharedContext())) {
+          this.getProcessor().getWorkflowInstance().setStatus(METADATA_MISSING);
+          this.getProcessor().persistWorkflowInstance();
+          return;
+        }
+
+        if (task.getConditions() != null) {
+          if (!this.conditionEvaluator.satisfied(task.getConditions(),
+              task.getTaskId(), this.getProcessor().getWorkflowInstance()
+                  .getSharedContext())
+              && isRunning()) {
+
+            LOG.log(Level.FINEST,
+                "Pre-conditions for task: " + task.getTaskName()
+                    + " unsatisfied");
+
+            if (!isPaused()) {
+              this.getProcessor().getWorkflowInstance()
+                  .setStatus(WorkflowStatus.PAUSED);
+            }
+            continue;
+          }
+        }
+        LOG.log(Level.FINEST, "Executing task: " + task.getTaskName());
+
+        this.addStdWorkflowMetadata(getProcessor().getWorkflowInstance(), task,
+            getProcessor().getWorkflowInstance().getSharedContext(), wmgrUrl);
+
+        if (runner instanceof ResourceRunner) {
+          getProcessor().getWorkflowInstance().setStatus(RESMGR_SUBMIT);
+          //persistWorkflowInstance();
+          /*runner.execute(task, getProcessor().getWorkflowInstance()
+              .getSharedContext());*/
+        } else {
+          //this.workflowInstance.setStatus(STARTED);
+          //this.persistWorkflowInstance();
+          String currentTaskIsoStartDateTimeStr = DateConvert
+              .isoFormat(new Date());
+          //this.workflowInstance
+              //.setCurrentTaskStartDateTimeIsoStr(currentTaskIsoStartDateTimeStr);
+          //this.workflowInstance.setCurrentTaskEndDateTimeIsoStr(null);
+          /*runner.execute(task, getProcessor().getWorkflowInstance()
+              .getSharedContext());*/
+
+          String currentTaskIsoEndDateTimeStr = DateConvert
+              .isoFormat(new Date());
+          //this.workflowInstance
+            //  .setCurrentTaskEndDateTimeIsoStr(currentTaskIsoEndDateTimeStr);
+         // this.persistWorkflowInstance();
+        }
+
+        LOG.log(Level.FINEST, "Completed task: " + task.getTaskName());
+
+      }
+
+      /*LOG.log(Level.FINEST, "Completed workflow: "
+          + this.workflowInstance.getWorkflow().getName());
+      if (isRunning()) {
+        stop();
+      }*/
     }
 
     /**
@@ -510,6 +613,62 @@ public class ThreadPoolWorkflowEngine im
       this.processor = processor;
     }
 
+    /**
+     * @return the running
+     */
+    public boolean isRunning() {
+      return running;
+    }
+
+    /**
+     * @param running
+     *          the running to set
+     */
+    public void setRunning(boolean running) {
+      this.running = running;
+    }
+
+    /**
+     * @return the conditionEvaluator
+     */
+    public ConditionProcessor getConditionEvaluator() {
+      return conditionEvaluator;
+    }
+
+    /**
+     * @param conditionEvaluator
+     *          the conditionEvaluator to set
+     */
+    public void setConditionEvaluator(ConditionProcessor conditionEvaluator) {
+      this.conditionEvaluator = conditionEvaluator;
+    }
+
+    public boolean isPaused() {
+      return this.getProcessor().getWorkflowInstance().getStatus()
+          .equals(WorkflowStatus.PAUSED);
+    }
+
+    protected void addStdWorkflowMetadata(WorkflowInstance wInst,
+        WorkflowTask task, Metadata ctx, URL wUrl) {
+      ctx.replaceMetadata(TASK_ID, task.getTaskId());
+      ctx.replaceMetadata(WORKFLOW_INST_ID, wInst.getId());
+      ctx.replaceMetadata(JOB_ID, wInst.getId());
+      ctx.replaceMetadata(PROCESSING_NODE, getHostname());
+      ctx.replaceMetadata(WORKFLOW_MANAGER_URL, wUrl.toString());
+    }
+
+    protected String getHostname() {
+      try {
+        // Get hostname by textual representation of IP address
+        InetAddress addr = InetAddress.getLocalHost();
+        // Get the host name
+        String hostname = addr.getHostName();
+        return hostname;
+      } catch (UnknownHostException e) {
+      }
+      return null;
+    }
+
   }
 
 }