You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by ma...@apache.org on 2011/08/14 07:39:57 UTC
svn commit: r1157470 [1/3] - in /oodt/trunk/workflow/src:
main/java/org/apache/oodt/cas/workflow/engine/
main/java/org/apache/oodt/cas/workflow/instrepo/
main/java/org/apache/oodt/cas/workflow/repository/
main/java/org/apache/oodt/cas/workflow/structs/...
Author: mattmann
Date: Sun Aug 14 05:39:56 2011
New Revision: 1157470
URL: http://svn.apache.org/viewvc?rev=1157470&view=rev
Log:
- progress towards OODT-215: WorkflowInstances should have pre-conditions as well
Added:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ConditionEvaluator.java
oodt/trunk/workflow/src/main/resources/examples/condition.workflow.xml (with props)
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialWorkflowProcessor.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/repository/DataSourceWorkflowRepository.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/repository/PackagedWorkflowRepository.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/repository/WorkflowRepository.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/repository/XMLWorkflowRepository.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/structs/Workflow.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/util/XmlRpcStructFactory.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/util/XmlStructFactory.java
oodt/trunk/workflow/src/main/resources/examples/wengine/hello-goodbye.xml
oodt/trunk/workflow/src/main/resources/workflow.sql
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/repository/TestPackagedWorkflowRepository.java
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/repository/TestWorkflowDataSourceRepository.java
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/repository/TestWorkflowRepository.java
oodt/trunk/workflow/src/testdata/workflow.sql
Added: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ConditionEvaluator.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ConditionEvaluator.java?rev=1157470&view=auto
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ConditionEvaluator.java (added)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ConditionEvaluator.java Sun Aug 14 05:39:56 2011
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imports
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+//OODT imports
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.structs.WorkflowCondition;
+import org.apache.oodt.cas.workflow.structs.WorkflowConditionInstance;
+import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
+import org.apache.oodt.commons.util.DateConvert;
+
+/**
+ *
+ * The Strategy for evaluating {@link WorkflowCondition}s. Maintains an internal
+ * cache of created {@link WorkflowConditionInstance}s, and then leverages those
+ * to perform evaluations based on optional, timeout, and other control flow
+ * rules. Also maintains an internal cache of {@link WorkflowCondition} start
+ * times, so that timeouts can be computed.
+ *
+ * @author mattmann
+ * @version $Revision$
+ *
+ */
+public class ConditionEvaluator {
+
+ protected Map<String, String> COND_TIMEOUTS;
+
+ protected Map<String, HashMap<String, WorkflowConditionInstance>> CONDITION_CACHE;
+
+ private static final Logger LOG = Logger.getLogger(ConditionEvaluator.class
+ .getName());
+
+ public ConditionEvaluator() {
+ this.COND_TIMEOUTS = new HashMap<String, String>();
+ this.CONDITION_CACHE = new HashMap<String, HashMap<String, WorkflowConditionInstance>>();
+ }
+
+ protected boolean satisfied(List<WorkflowCondition> conditionList, String id,
+ Metadata context) {
+ for (WorkflowCondition c : conditionList) {
+ WorkflowConditionInstance cInst = null;
+ if (!COND_TIMEOUTS.containsKey(c.getConditionId())) {
+ COND_TIMEOUTS.put(c.getConditionId(), generateISO8601());
+ }
+
+ // see if we've already cached this condition instance
+ if (CONDITION_CACHE.get(id) != null) {
+ HashMap<String, WorkflowConditionInstance> conditionMap = CONDITION_CACHE
+ .get(id);
+
+ /*
+ * okay we have some conditions cached for this task, see if we have the
+ * one we need
+ */
+ if (conditionMap.get(c.getConditionId()) != null) {
+ cInst = (WorkflowConditionInstance) conditionMap.get(c
+ .getConditionId());
+ }
+ /* if not, then go ahead and create it and cache it */
+ else {
+ cInst = GenericWorkflowObjectFactory
+ .getConditionObjectFromClassName(c
+ .getConditionInstanceClassName());
+ conditionMap.put(c.getConditionId(), cInst);
+ }
+ }
+ /* no conditions cached yet, so set everything up */
+ else {
+ HashMap<String, WorkflowConditionInstance> conditionMap = new HashMap<String, WorkflowConditionInstance>();
+ cInst = GenericWorkflowObjectFactory.getConditionObjectFromClassName(c
+ .getConditionInstanceClassName());
+ conditionMap.put(c.getConditionId(), cInst);
+ CONDITION_CACHE.put(id, conditionMap);
+ }
+
+ // actually perform the evaluation
+ boolean result = false;
+ if (!(result = cInst.evaluate(context, c.getCondConfig()))
+ && !isOptional(c, result) && !timedOut(c)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ protected boolean isOptional(WorkflowCondition condition, boolean result) {
+ if (condition.isOptional()) {
+ LOG.log(Level.WARNING, "Condition: [" + condition.getConditionId()
+ + "] is optional: evaluation results: [" + result + "] ignored");
+ return true;
+ } else {
+ LOG.log(Level.INFO, "Condition: [" + condition.getConditionId()
+ + "] is required: evaluation results: [" + result + "] included.");
+ return false;
+ }
+ }
+
+ protected boolean timedOut(WorkflowCondition condition) {
+ if (condition.getTimeoutSeconds() == -1)
+ return false;
+ String isoStartDateTimeStr = COND_TIMEOUTS.get(condition.getConditionId());
+ Date isoStartDateTime = null;
+ try {
+ isoStartDateTime = DateConvert.isoParse(isoStartDateTimeStr);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOG.log(Level.WARNING, "Unable to parse start date time for condition: ["
+ + condition.getConditionId() + "]: start date time: ["
+ + isoStartDateTimeStr + "]: Reason: " + e.getMessage());
+ return false;
+ }
+ Date now = new Date();
+ long numSecondsElapsed = (now.getTime() - isoStartDateTime.getTime()) / (1000);
+ if (numSecondsElapsed >= condition.getTimeoutSeconds()) {
+ LOG.log(
+ Level.INFO,
+ "Condition: [" + condition.getConditionName()
+ + "]: exceeded timeout threshold of: ["
+ + condition.getTimeoutSeconds() + "] seconds: elapsed time: ["
+ + numSecondsElapsed + "]");
+ return true;
+ } else {
+ LOG.log(
+ Level.FINEST,
+ "Condition: [" + condition.getConditionName()
+ + "]: has not exceeded timeout threshold of: ["
+ + condition.getTimeoutSeconds() + "] seconds: elapsed time: ["
+ + numSecondsElapsed + "]");
+ return false;
+ }
+ }
+
+ private String generateISO8601() {
+ return DateConvert.isoFormat(new Date());
+ }
+
+}
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialWorkflowProcessor.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialWorkflowProcessor.java?rev=1157470&r1=1157469&r2=1157470&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialWorkflowProcessor.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialWorkflowProcessor.java Sun Aug 14 05:39:56 2011
@@ -60,11 +60,14 @@ public class SequentialWorkflowProcessor
/* our log stream */
private static Logger LOG = Logger
.getLogger(SequentialWorkflowProcessor.class.getName());
+
+ private ConditionEvaluator conditionEvaluator;
public SequentialWorkflowProcessor(WorkflowInstance wInst,
WorkflowInstanceRepository instRep, URL wParentUrl, long conditionWait) {
super(wInst, instRep, wParentUrl, conditionWait);
taskIterator = this.workflowInstance.getWorkflow().getTasks().iterator();
+ this.conditionEvaluator = new ConditionEvaluator();
}
/*
@@ -98,8 +101,8 @@ public class SequentialWorkflowProcessor
}
if (task.getConditions() != null) {
- while (!satisfied(task.getConditions(), task.getTaskId())
- && isRunning()) {
+ while(!this.conditionEvaluator.satisfied(task.getConditions(), task.getTaskId(),
+ this.workflowInstance.getSharedContext()) && isRunning()) {
if (!isPaused()) {
pause();
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java?rev=1157470&r1=1157469&r2=1157470&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java Sun Aug 14 05:39:56 2011
@@ -21,11 +21,7 @@ package org.apache.oodt.cas.workflow.eng
import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -33,16 +29,11 @@ import java.util.logging.Logger;
import org.apache.oodt.cas.metadata.Metadata;
import org.apache.oodt.cas.resource.system.XmlRpcResourceManagerClient;
import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
-import org.apache.oodt.cas.workflow.structs.WorkflowCondition;
-import org.apache.oodt.cas.workflow.structs.WorkflowConditionInstance;
import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
import org.apache.oodt.cas.workflow.structs.WorkflowTask;
import org.apache.oodt.cas.workflow.structs.WorkflowTaskConfiguration;
import org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance;
import org.apache.oodt.cas.workflow.structs.exceptions.InstanceRepositoryException;
-import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
-import org.apache.oodt.commons.date.DateUtils;
-import org.apache.oodt.commons.util.DateConvert;
/**
* An abstract base class representing the methodology for processing a
@@ -72,15 +63,11 @@ public abstract class WorkflowProcessor
protected static final String DEFAULT_QUEUE_NAME = "high";
- protected Map<String, HashMap<String, WorkflowConditionInstance>> CONDITION_CACHE = new HashMap<String, HashMap<String, WorkflowConditionInstance>>();
-
protected URL wmgrParentUrl = null;
protected String currentJobId = null;
protected boolean paused = false;
-
- protected Map<String, String> COND_TIMEOUTS = new HashMap<String, String>();
public WorkflowProcessor(WorkflowInstance workflowInstance,
WorkflowInstanceRepository instRep, URL wParentUrl, long conditionWait) {
@@ -252,102 +239,6 @@ public abstract class WorkflowProcessor
return true;
}
- protected boolean satisfied(List<WorkflowCondition> conditionList,
- String taskId) {
- for (WorkflowCondition c : conditionList) {
- WorkflowConditionInstance cInst = null;
- if(!COND_TIMEOUTS.containsKey(c.getConditionId())){
- COND_TIMEOUTS.put(c.getConditionId(), generateISO8601());
- }
-
- // see if we've already cached this condition instance
- if (CONDITION_CACHE.get(taskId) != null) {
- HashMap<String, WorkflowConditionInstance> conditionMap = CONDITION_CACHE
- .get(taskId);
-
- /*
- * okay we have some conditions cached for this task, see if we have the
- * one we need
- */
- if (conditionMap.get(c.getConditionId()) != null) {
- cInst = (WorkflowConditionInstance) conditionMap.get(c
- .getConditionId());
- }
- /* if not, then go ahead and create it and cache it */
- else {
- cInst = GenericWorkflowObjectFactory
- .getConditionObjectFromClassName(c
- .getConditionInstanceClassName());
- conditionMap.put(c.getConditionId(), cInst);
- }
- }
- /* no conditions cached yet, so set everything up */
- else {
- HashMap<String, WorkflowConditionInstance> conditionMap = new HashMap<String, WorkflowConditionInstance>();
- cInst = GenericWorkflowObjectFactory.getConditionObjectFromClassName(c
- .getConditionInstanceClassName());
- conditionMap.put(c.getConditionId(), cInst);
- CONDITION_CACHE.put(taskId, conditionMap);
- }
-
- // actually perform the evaluation
- boolean result = false;
- if (!(result = cInst.evaluate(this.workflowInstance.getSharedContext(),
- c.getCondConfig())) && !isOptional(c, result) && !timedOut(c)){
- return false;
- }
- }
-
- return true;
- }
-
- protected boolean isOptional(WorkflowCondition condition, boolean result) {
- if (condition.isOptional()) {
- LOG.log(Level.WARNING, "Condition: [" + condition.getConditionId()
- + "] is optional: evaluation results: ["+result+"] ignored");
- return true;
- } else {
- LOG.log(Level.INFO, "Condition: [" + condition.getConditionId()
- + "] is required: evaluation results: ["+result+"] included.");
- return false;
- }
- }
-
- protected boolean timedOut(WorkflowCondition condition) {
- if (condition.getTimeoutSeconds() == -1)
- return false;
- String isoStartDateTimeStr = COND_TIMEOUTS.get(condition.getConditionId());
- Date isoStartDateTime = null;
- try {
- isoStartDateTime = DateConvert.isoParse(isoStartDateTimeStr);
- } catch (Exception e) {
- e.printStackTrace();
- LOG.log(Level.WARNING, "Unable to parse start date time for condition: ["
- + condition.getConditionId() + "]: start date time: ["
- + isoStartDateTimeStr + "]: Reason: " + e.getMessage());
- return false;
- }
- Date now = new Date();
- long numSecondsElapsed = (now.getTime() - isoStartDateTime.getTime()) / (1000);
- if (numSecondsElapsed >= condition.getTimeoutSeconds()) {
- LOG.log(
- Level.INFO,
- "Condition: [" + condition.getConditionName()
- + "]: exceeded timeout threshold of: ["
- + condition.getTimeoutSeconds() + "] seconds: elapsed time: ["
- + numSecondsElapsed + "]");
- return true;
- } else{
- LOG.log(
- Level.FINEST,
- "Condition: [" + condition.getConditionName()
- + "]: has not exceeded timeout threshold of: ["
- + condition.getTimeoutSeconds() + "] seconds: elapsed time: ["
- + numSecondsElapsed + "]");
- return false;
- }
- }
-
protected String getTaskNameById(String taskId) {
for (WorkflowTask task : (List<WorkflowTask>) (List<?>) this.workflowInstance
.getWorkflow().getTasks()) {
@@ -370,9 +261,5 @@ public abstract class WorkflowProcessor
}
return null;
}
-
- private String generateISO8601(){
- return DateConvert.isoFormat(new Date());
- }
}
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java?rev=1157470&r1=1157469&r2=1157470&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java Sun Aug 14 05:39:56 2011
@@ -516,6 +516,11 @@ public class LuceneWorkflowInstanceRepos
// store the tasks
addTasksToDoc(doc, workflowInst.getWorkflow().getTasks());
+
+ // store workflow conditions
+ addConditionsToDoc("workflow_condition_"+workflowInst.getWorkflow().getId(),
+ workflowInst.getWorkflow().getConditions()
+ , doc);
// add the default field (so that we can do a query for *)
doc.add(new Field("myfield", "myvalue", Field.Store.YES,
@@ -644,6 +649,7 @@ public class LuceneWorkflowInstanceRepos
workflow.setId(doc.get("workflow_id"));
workflow.setName(doc.get("workflow_name"));
workflow.setTasks(toTasks(doc));
+ workflow.setConditions(toConditions("workflow_condition_"+workflow.getId(), doc));
inst.setWorkflow(workflow);