You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by ma...@apache.org on 2011/08/14 07:39:57 UTC

svn commit: r1157470 [1/3] - in /oodt/trunk/workflow/src: main/java/org/apache/oodt/cas/workflow/engine/ main/java/org/apache/oodt/cas/workflow/instrepo/ main/java/org/apache/oodt/cas/workflow/repository/ main/java/org/apache/oodt/cas/workflow/structs/...

Author: mattmann
Date: Sun Aug 14 05:39:56 2011
New Revision: 1157470

URL: http://svn.apache.org/viewvc?rev=1157470&view=rev
Log:
- progress towards OODT-215: WorkflowInstances should have pre-conditions as well

Added:
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ConditionEvaluator.java
    oodt/trunk/workflow/src/main/resources/examples/condition.workflow.xml   (with props)
Modified:
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialWorkflowProcessor.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/repository/DataSourceWorkflowRepository.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/repository/PackagedWorkflowRepository.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/repository/WorkflowRepository.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/repository/XMLWorkflowRepository.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/structs/Workflow.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/util/XmlRpcStructFactory.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/util/XmlStructFactory.java
    oodt/trunk/workflow/src/main/resources/examples/wengine/hello-goodbye.xml
    oodt/trunk/workflow/src/main/resources/workflow.sql
    oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/repository/TestPackagedWorkflowRepository.java
    oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/repository/TestWorkflowDataSourceRepository.java
    oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/repository/TestWorkflowRepository.java
    oodt/trunk/workflow/src/testdata/workflow.sql

Added: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ConditionEvaluator.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ConditionEvaluator.java?rev=1157470&view=auto
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ConditionEvaluator.java (added)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ConditionEvaluator.java Sun Aug 14 05:39:56 2011
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imports
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+//OODT imports
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.structs.WorkflowCondition;
+import org.apache.oodt.cas.workflow.structs.WorkflowConditionInstance;
+import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
+import org.apache.oodt.commons.util.DateConvert;
+
+/**
+ * 
+ * The Strategy for evaluating {@link WorkflowCondition}s. Maintains an internal
+ * cache of created {@link WorkflowConditionInstance}s, and then leverages those
+ * to perform evaluations based on optional, timeout, and other control flow
+ * rules. Also maintains an internal cache of {@link WorkflowCondition} start
+ * times, so that timeouts can be computed.
+ * 
+ * @author mattmann
+ * @version $Revision$
+ * 
+ */
+public class ConditionEvaluator {
+
+  protected Map<String, String> COND_TIMEOUTS;
+
+  protected Map<String, HashMap<String, WorkflowConditionInstance>> CONDITION_CACHE;
+
+  private static final Logger LOG = Logger.getLogger(ConditionEvaluator.class
+      .getName());
+
+  public ConditionEvaluator() {
+    this.COND_TIMEOUTS = new HashMap<String, String>();
+    this.CONDITION_CACHE = new HashMap<String, HashMap<String, WorkflowConditionInstance>>();
+  }
+
+  protected boolean satisfied(List<WorkflowCondition> conditionList, String id,
+      Metadata context) {
+    for (WorkflowCondition c : conditionList) {
+      WorkflowConditionInstance cInst = null;
+      if (!COND_TIMEOUTS.containsKey(c.getConditionId())) {
+        COND_TIMEOUTS.put(c.getConditionId(), generateISO8601());
+      }
+
+      // see if we've already cached this condition instance
+      if (CONDITION_CACHE.get(id) != null) {
+        HashMap<String, WorkflowConditionInstance> conditionMap = CONDITION_CACHE
+            .get(id);
+
+        /*
+         * okay we have some conditions cached for this task, see if we have the
+         * one we need
+         */
+        if (conditionMap.get(c.getConditionId()) != null) {
+          cInst = (WorkflowConditionInstance) conditionMap.get(c
+              .getConditionId());
+        }
+        /* if not, then go ahead and create it and cache it */
+        else {
+          cInst = GenericWorkflowObjectFactory
+              .getConditionObjectFromClassName(c
+                  .getConditionInstanceClassName());
+          conditionMap.put(c.getConditionId(), cInst);
+        }
+      }
+      /* no conditions cached yet, so set everything up */
+      else {
+        HashMap<String, WorkflowConditionInstance> conditionMap = new HashMap<String, WorkflowConditionInstance>();
+        cInst = GenericWorkflowObjectFactory.getConditionObjectFromClassName(c
+            .getConditionInstanceClassName());
+        conditionMap.put(c.getConditionId(), cInst);
+        CONDITION_CACHE.put(id, conditionMap);
+      }
+
+      // actually perform the evaluation
+      boolean result = false;
+      if (!(result = cInst.evaluate(context, c.getCondConfig()))
+          && !isOptional(c, result) && !timedOut(c)) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  protected boolean isOptional(WorkflowCondition condition, boolean result) {
+    if (condition.isOptional()) {
+      LOG.log(Level.WARNING, "Condition: [" + condition.getConditionId()
+          + "] is optional: evaluation results: [" + result + "] ignored");
+      return true;
+    } else {
+      LOG.log(Level.INFO, "Condition: [" + condition.getConditionId()
+          + "] is required: evaluation results: [" + result + "] included.");
+      return false;
+    }
+  }
+
+  protected boolean timedOut(WorkflowCondition condition) {
+    if (condition.getTimeoutSeconds() == -1)
+      return false;
+    String isoStartDateTimeStr = COND_TIMEOUTS.get(condition.getConditionId());
+    Date isoStartDateTime = null;
+    try {
+      isoStartDateTime = DateConvert.isoParse(isoStartDateTimeStr);
+    } catch (Exception e) {
+      e.printStackTrace();
+      LOG.log(Level.WARNING, "Unable to parse start date time for condition: ["
+          + condition.getConditionId() + "]: start date time: ["
+          + isoStartDateTimeStr + "]: Reason: " + e.getMessage());
+      return false;
+    }
+    Date now = new Date();
+    long numSecondsElapsed = (now.getTime() - isoStartDateTime.getTime()) / (1000);
+    if (numSecondsElapsed >= condition.getTimeoutSeconds()) {
+      LOG.log(
+          Level.INFO,
+          "Condition: [" + condition.getConditionName()
+              + "]: exceeded timeout threshold of: ["
+              + condition.getTimeoutSeconds() + "] seconds: elapsed time: ["
+              + numSecondsElapsed + "]");
+      return true;
+    } else {
+      LOG.log(
+          Level.FINEST,
+          "Condition: [" + condition.getConditionName()
+              + "]: has not exceeded timeout threshold of: ["
+              + condition.getTimeoutSeconds() + "] seconds: elapsed time: ["
+              + numSecondsElapsed + "]");
+      return false;
+    }
+  }
+
+  private String generateISO8601() {
+    return DateConvert.isoFormat(new Date());
+  }
+
+}

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialWorkflowProcessor.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialWorkflowProcessor.java?rev=1157470&r1=1157469&r2=1157470&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialWorkflowProcessor.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialWorkflowProcessor.java Sun Aug 14 05:39:56 2011
@@ -60,11 +60,14 @@ public class SequentialWorkflowProcessor
   /* our log stream */
   private static Logger LOG = Logger
       .getLogger(SequentialWorkflowProcessor.class.getName());
+  
+  private ConditionEvaluator conditionEvaluator;
 
   public SequentialWorkflowProcessor(WorkflowInstance wInst,
       WorkflowInstanceRepository instRep, URL wParentUrl, long conditionWait) {
     super(wInst, instRep, wParentUrl, conditionWait);
     taskIterator = this.workflowInstance.getWorkflow().getTasks().iterator();
+    this.conditionEvaluator = new ConditionEvaluator();
   }
 
   /*
@@ -98,8 +101,8 @@ public class SequentialWorkflowProcessor
       }
 
       if (task.getConditions() != null) {
-        while (!satisfied(task.getConditions(), task.getTaskId())
-            && isRunning()) {
+        while(!this.conditionEvaluator.satisfied(task.getConditions(), task.getTaskId(),
+            this.workflowInstance.getSharedContext()) && isRunning()) {
 
           if (!isPaused()) {
             pause();

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java?rev=1157470&r1=1157469&r2=1157470&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java Sun Aug 14 05:39:56 2011
@@ -21,11 +21,7 @@ package org.apache.oodt.cas.workflow.eng
 import java.net.InetAddress;
 import java.net.URL;
 import java.net.UnknownHostException;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -33,16 +29,11 @@ import java.util.logging.Logger;
 import org.apache.oodt.cas.metadata.Metadata;
 import org.apache.oodt.cas.resource.system.XmlRpcResourceManagerClient;
 import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
-import org.apache.oodt.cas.workflow.structs.WorkflowCondition;
-import org.apache.oodt.cas.workflow.structs.WorkflowConditionInstance;
 import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
 import org.apache.oodt.cas.workflow.structs.WorkflowTask;
 import org.apache.oodt.cas.workflow.structs.WorkflowTaskConfiguration;
 import org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance;
 import org.apache.oodt.cas.workflow.structs.exceptions.InstanceRepositoryException;
-import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
-import org.apache.oodt.commons.date.DateUtils;
-import org.apache.oodt.commons.util.DateConvert;
 
 /**
  * An abstract base class representing the methodology for processing a
@@ -72,15 +63,11 @@ public abstract class WorkflowProcessor 
 
   protected static final String DEFAULT_QUEUE_NAME = "high";
 
-  protected Map<String, HashMap<String, WorkflowConditionInstance>> CONDITION_CACHE = new HashMap<String, HashMap<String, WorkflowConditionInstance>>();
-
   protected URL wmgrParentUrl = null;
 
   protected String currentJobId = null;
 
   protected boolean paused = false;
-  
-  protected Map<String, String> COND_TIMEOUTS = new HashMap<String, String>();
 
   public WorkflowProcessor(WorkflowInstance workflowInstance,
       WorkflowInstanceRepository instRep, URL wParentUrl, long conditionWait) {
@@ -252,102 +239,6 @@ public abstract class WorkflowProcessor 
     return true;
   }
 
-  protected boolean satisfied(List<WorkflowCondition> conditionList,
-      String taskId) {
-    for (WorkflowCondition c : conditionList) {
-      WorkflowConditionInstance cInst = null;
-      if(!COND_TIMEOUTS.containsKey(c.getConditionId())){
-          COND_TIMEOUTS.put(c.getConditionId(), generateISO8601());
-      }
-
-      // see if we've already cached this condition instance
-      if (CONDITION_CACHE.get(taskId) != null) {
-        HashMap<String, WorkflowConditionInstance> conditionMap = CONDITION_CACHE
-            .get(taskId);
-
-        /*
-         * okay we have some conditions cached for this task, see if we have the
-         * one we need
-         */
-        if (conditionMap.get(c.getConditionId()) != null) {
-          cInst = (WorkflowConditionInstance) conditionMap.get(c
-              .getConditionId());
-        }
-        /* if not, then go ahead and create it and cache it */
-        else {
-          cInst = GenericWorkflowObjectFactory
-              .getConditionObjectFromClassName(c
-                  .getConditionInstanceClassName());
-          conditionMap.put(c.getConditionId(), cInst);
-        }
-      }
-      /* no conditions cached yet, so set everything up */
-      else {
-        HashMap<String, WorkflowConditionInstance> conditionMap = new HashMap<String, WorkflowConditionInstance>();
-        cInst = GenericWorkflowObjectFactory.getConditionObjectFromClassName(c
-            .getConditionInstanceClassName());
-        conditionMap.put(c.getConditionId(), cInst);
-        CONDITION_CACHE.put(taskId, conditionMap);
-      }
-
-      // actually perform the evaluation
-      boolean result = false;
-      if (!(result = cInst.evaluate(this.workflowInstance.getSharedContext(),
-          c.getCondConfig())) && !isOptional(c, result) && !timedOut(c)){
-        return false;
-      }
-    }
-
-    return true;
-  }
-  
-  protected boolean isOptional(WorkflowCondition condition, boolean result) {
-    if (condition.isOptional()) {
-      LOG.log(Level.WARNING, "Condition: [" + condition.getConditionId()
-          + "] is optional: evaluation results: ["+result+"] ignored");
-      return true;
-    } else {
-      LOG.log(Level.INFO, "Condition: [" + condition.getConditionId()
-          + "] is required: evaluation results: ["+result+"] included.");
-      return false;
-    }
-  }
-
-  protected boolean timedOut(WorkflowCondition condition) {
-    if (condition.getTimeoutSeconds() == -1)
-      return false;
-    String isoStartDateTimeStr = COND_TIMEOUTS.get(condition.getConditionId());
-    Date isoStartDateTime = null;
-    try {
-      isoStartDateTime = DateConvert.isoParse(isoStartDateTimeStr);
-    } catch (Exception e) {
-      e.printStackTrace();
-      LOG.log(Level.WARNING, "Unable to parse start date time for condition: ["
-          + condition.getConditionId() + "]: start date time: ["
-          + isoStartDateTimeStr + "]: Reason: " + e.getMessage());
-      return false;
-    }
-    Date now = new Date();
-    long numSecondsElapsed = (now.getTime() - isoStartDateTime.getTime()) / (1000);
-    if (numSecondsElapsed >= condition.getTimeoutSeconds()) {
-      LOG.log(
-          Level.INFO,
-          "Condition: [" + condition.getConditionName()
-              + "]: exceeded timeout threshold of: ["
-              + condition.getTimeoutSeconds() + "] seconds: elapsed time: ["
-              + numSecondsElapsed + "]");
-      return true;
-    } else{
-      LOG.log(
-          Level.FINEST,
-          "Condition: [" + condition.getConditionName()
-              + "]: has not exceeded timeout threshold of: ["
-              + condition.getTimeoutSeconds() + "] seconds: elapsed time: ["
-              + numSecondsElapsed + "]");      
-      return false;
-    }
-  }
-
   protected String getTaskNameById(String taskId) {
     for (WorkflowTask task : (List<WorkflowTask>) (List<?>) this.workflowInstance
         .getWorkflow().getTasks()) {
@@ -370,9 +261,5 @@ public abstract class WorkflowProcessor 
     }
     return null;
   }
-  
-  private String generateISO8601(){
-    return DateConvert.isoFormat(new Date());
-  }
 
 }

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java?rev=1157470&r1=1157469&r2=1157470&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java Sun Aug 14 05:39:56 2011
@@ -516,6 +516,11 @@ public class LuceneWorkflowInstanceRepos
 
         // store the tasks
         addTasksToDoc(doc, workflowInst.getWorkflow().getTasks());
+        
+        // store workflow conditions
+        addConditionsToDoc("workflow_condition_"+workflowInst.getWorkflow().getId(), 
+            workflowInst.getWorkflow().getConditions()
+            , doc);
 
         // add the default field (so that we can do a query for *)
         doc.add(new Field("myfield", "myvalue", Field.Store.YES,
@@ -644,6 +649,7 @@ public class LuceneWorkflowInstanceRepos
         workflow.setId(doc.get("workflow_id"));
         workflow.setName(doc.get("workflow_name"));
         workflow.setTasks(toTasks(doc));
+        workflow.setConditions(toConditions("workflow_condition_"+workflow.getId(), doc));
 
         inst.setWorkflow(workflow);