You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by ma...@apache.org on 2011/07/04 23:25:58 UTC
svn commit: r1142815 - in /oodt/trunk: ./
workflow/src/main/java/org/apache/oodt/cas/workflow/repository/
workflow/src/main/java/org/apache/oodt/cas/workflow/util/
workflow/src/main/resources/ workflow/src/main/resources/examples/wengine/
workflow/src/...
Author: mattmann
Date: Mon Jul 4 21:25:58 2011
New Revision: 1142815
URL: http://svn.apache.org/viewvc?rev=1142815&view=rev
Log:
- fix for OODT-70 Add ability for sequential and parallel task specifications for Workflows
Added:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/repository/PackagedWorkflowRepository.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/repository/PackagedWorkflowRepositoryFactory.java
oodt/trunk/workflow/src/main/resources/examples/wengine/
oodt/trunk/workflow/src/main/resources/examples/wengine/GranuleMaps.xml
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/repository/TestPackagedWorkflowRepository.java
Modified:
oodt/trunk/CHANGES.txt
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/util/XmlStructFactory.java
oodt/trunk/workflow/src/main/resources/workflow.properties
Modified: oodt/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/oodt/trunk/CHANGES.txt?rev=1142815&r1=1142814&r2=1142815&view=diff
==============================================================================
--- oodt/trunk/CHANGES.txt (original)
+++ oodt/trunk/CHANGES.txt Mon Jul 4 21:25:58 2011
@@ -2,6 +2,9 @@ Apache OODT Change Log
======================
Release 0.4: Current Development
+--------------------------------------------
+* OODT-70 Add ability for sequential and parallel task
+ specifications for Workflows (mattmann, bfoster)
* OODT-295 BasicVersioner doesn't work with Hierarchical
Products (mattmann, Tim Stough)
Added: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/repository/PackagedWorkflowRepository.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/repository/PackagedWorkflowRepository.java?rev=1142815&view=auto
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/repository/PackagedWorkflowRepository.java (added)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/repository/PackagedWorkflowRepository.java Mon Jul 4 21:25:58 2011
@@ -0,0 +1,1009 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.repository;
+
+//OODT imports
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.metadata.CoreMetKeys;
+import org.apache.oodt.cas.workflow.structs.Workflow;
+import org.apache.oodt.cas.workflow.structs.WorkflowCondition;
+import org.apache.oodt.cas.workflow.structs.WorkflowConditionConfiguration;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+import org.apache.oodt.cas.workflow.structs.WorkflowTaskConfiguration;
+import org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance;
+import org.apache.oodt.cas.workflow.structs.exceptions.RepositoryException;
+import org.apache.oodt.cas.workflow.structs.exceptions.WorkflowTaskInstanceException;
+import org.apache.oodt.cas.workflow.system.XmlRpcWorkflowManagerClient;
+import org.apache.oodt.cas.workflow.util.XmlStructFactory;
+
+//JDK imports
+import java.io.File;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.Vector;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import org.w3c.dom.Attr;
+import org.w3c.dom.Element;
+import org.w3c.dom.NamedNodeMap;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+/**
+ *
+ *
+ * Loads Workflow2 (WEngine) Style Workflow XML files.
+ *
+ * @author mattmann
+ * @author bfoster
+ */
+public class PackagedWorkflowRepository implements WorkflowRepository {
+
+ private List<File> files;
+
+ private List<String> processorIds = Arrays.asList(new String[] {
+ "sequential", "parallel", "condition", "task" });
+
+ private Map<String, ParentChildWorkflow> workflows;
+
+ private List<Graph> graphs;
+
+ private Map<String, WorkflowCondition> conditions;
+
+ private Map<String, WorkflowTask> tasks;
+
+ private Map<String, Metadata> globalConfGroups;
+
+ private Map<String, List<ParentChildWorkflow>> eventWorkflowMap;
+
+ private static final Logger LOG = Logger
+ .getLogger(PackagedWorkflowRepository.class.getName());
+
+ public PackagedWorkflowRepository(List<File> files)
+ throws InstantiationException {
+ this.files = files;
+ try {
+ this.init();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new InstantiationException(e.getMessage());
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.repository.WorkflowRepository#getWorkflowByName
+ * (java.lang.String)
+ */
+ @Override
+ public Workflow getWorkflowByName(String workflowName)
+ throws RepositoryException {
+
+ for (Workflow w : this.workflows.values()) {
+ if (w.getName().equals(workflowName)) {
+ return w;
+ }
+ }
+
+ return null;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.repository.WorkflowRepository#getWorkflowById
+ * (java.lang.String)
+ */
+ @Override
+ public Workflow getWorkflowById(String workflowId) throws RepositoryException {
+ return workflows.get(workflowId);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.repository.WorkflowRepository#getWorkflows()
+ */
+ @Override
+ public List getWorkflows() throws RepositoryException {
+ List<Workflow> workflows = new Vector<Workflow>();
+ workflows.addAll(this.workflows.values());
+ return workflows;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.repository.WorkflowRepository#getTasksByWorkflowId
+ * (java.lang.String)
+ */
+ @Override
+ public List getTasksByWorkflowId(String workflowId)
+ throws RepositoryException {
+ Workflow w = this.getWorkflowById(workflowId);
+ return w.getTasks();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oodt.cas.workflow.repository.WorkflowRepository#
+ * getTasksByWorkflowName(java.lang.String)
+ */
+ @Override
+ public List getTasksByWorkflowName(String workflowName)
+ throws RepositoryException {
+ Workflow w = this.getWorkflowByName(workflowName);
+ if (w != null) {
+ return w.getTasks();
+ } else
+ return Collections.emptyList();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.repository.WorkflowRepository#getWorkflowsForEvent
+ * (java.lang.String)
+ */
+ @Override
+ public List getWorkflowsForEvent(String eventName) throws RepositoryException {
+ List<ParentChildWorkflow> workflows = this.eventWorkflowMap.get(eventName);
+ if (workflows != null && workflows.size() > 0) {
+ return workflows;
+ } else
+ return Collections.emptyList();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oodt.cas.workflow.repository.WorkflowRepository#
+ * getConditionsByTaskName(java.lang.String)
+ */
+ @Override
+ public List getConditionsByTaskName(String taskName)
+ throws RepositoryException {
+
+ for (WorkflowTask task : this.tasks.values()) {
+ if (task.getTaskName().equals(taskName)) {
+ return task.getConditions();
+ }
+ }
+
+ return Collections.emptyList();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oodt.cas.workflow.repository.WorkflowRepository#
+ * getConditionsByTaskId(java.lang.String)
+ */
+ @Override
+ public List getConditionsByTaskId(String taskId) throws RepositoryException {
+ if (this.tasks.get(taskId) != null) {
+ return this.tasks.get(taskId).getConditions();
+ } else
+ return Collections.emptyList();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oodt.cas.workflow.repository.WorkflowRepository#
+ * getConfigurationByTaskId(java.lang.String)
+ */
+ @Override
+ public WorkflowTaskConfiguration getConfigurationByTaskId(String taskId)
+ throws RepositoryException {
+ return convertToTaskConfiguration(this.globalConfGroups.get(taskId));
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.repository.WorkflowRepository#getWorkflowTaskById
+ * (java.lang.String)
+ */
+ @Override
+ public WorkflowTask getWorkflowTaskById(String taskId)
+ throws RepositoryException {
+ return this.tasks.get(taskId);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oodt.cas.workflow.repository.WorkflowRepository#
+ * getWorkflowConditionById(java.lang.String)
+ */
+ @Override
+ public WorkflowCondition getWorkflowConditionById(String conditionId)
+ throws RepositoryException {
+ return this.conditions.get(conditionId);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.repository.WorkflowRepository#getRegisteredEvents
+ * ()
+ */
+ @Override
+ public List getRegisteredEvents() throws RepositoryException {
+ return Arrays.asList(this.eventWorkflowMap.keySet().toArray());
+ }
+
+ private void init() throws RepositoryException {
+ this.workflows = new HashMap<String, ParentChildWorkflow>();
+ this.tasks = new HashMap<String, WorkflowTask>();
+ this.conditions = new HashMap<String, WorkflowCondition>();
+ this.eventWorkflowMap = new HashMap<String, List<ParentChildWorkflow>>();
+ this.globalConfGroups = new HashMap<String, Metadata>();
+ this.graphs = new Vector<Graph>();
+ DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+ DocumentBuilder parser = null;
+
+ try {
+ parser = factory.newDocumentBuilder();
+ List<Element> rootElements = new Vector<Element>();
+ for (File file : files)
+ rootElements.add(parser.parse(file).getDocumentElement());
+ for (Element root : rootElements) {
+ Metadata staticMetadata = new Metadata();
+ loadConfiguration(rootElements, root, staticMetadata);
+ loadTaskAndConditionDefinitions(rootElements, root, staticMetadata);
+ loadGraphs(rootElements, root, new Graph(), staticMetadata);
+ computeEvents();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RepositoryException(e.getMessage());
+ }
+ }
+
+ private void computeEvents() throws Exception {
+ List<ParentChildWorkflow> workflows = new Vector<ParentChildWorkflow>();
+ for (ParentChildWorkflow w : this.workflows.values()) {
+ workflows.add(w);
+
+ }
+ for (ParentChildWorkflow workflow : workflows) {
+
+ // event for each workflow id
+ List<ParentChildWorkflow> wList = new Vector<ParentChildWorkflow>();
+ wList.add(workflow);
+ this.eventWorkflowMap.put(workflow.getId(), wList);
+
+ // clear its tasks, we are going to re-add them back
+ workflow.getTasks().clear();
+ List<Graph> children = workflow.getGraph().getChildren();
+ if (workflow.getGraph().getExecutionType().equals("sequential")) {
+ for (Graph child : children) {
+ if (child.getWorkflow() != null) {
+ workflow.getTasks().add(
+ generateRedirector(child.getWorkflow().getId()));
+ } else if (child.getTask() != null) {
+ workflow.getTasks().add(child.getTask());
+ }
+ }
+ } else if (workflow.getGraph().getExecutionType().equals("parallel")) {
+ // clear it as a workflow from the list
+ // to begin with
+ this.eventWorkflowMap.get(workflow.getId()).clear();
+ this.workflows.remove(workflow.getId());
+ for (Graph child : children) {
+ if (child.getWorkflow() != null) {
+ // add child workflow to the event kickoff for this id
+ this.eventWorkflowMap.get(workflow.getId())
+ .add(child.getWorkflow());
+ } else if (child.getTask() != null) {
+ // add a new dynamic workflow
+ // with just this task
+ ParentChildWorkflow w = getDynamicWorkflow(child.getTask());
+ this.eventWorkflowMap.get(workflow.getId()).add(w);
+ }
+ }
+ } else
+ throw new Exception("Unsupported execution type: ["
+ + workflow.getGraph().getExecutionType() + "]");
+ }
+ }
+
+ private void loadTaskAndConditionDefinitions(List<Element> rootElements,
+ Element rootElem, Metadata staticMetadata) throws Exception {
+
+ List<Element> conditionBlocks = this.getChildrenByTagName(rootElem,
+ "condition");
+ List<Element> taskBlocks = this.getChildrenByTagName(rootElem, "task");
+
+ if (conditionBlocks != null && conditionBlocks.size() > 0) {
+ LOG.log(Level.FINER, "Loading: [" + conditionBlocks.size()
+ + "] conditions from: ["
+ + rootElem.getOwnerDocument().getDocumentURI() + "]");
+
+ for (Element condElem : conditionBlocks) {
+ loadGraphs(rootElements, condElem, new Graph(), staticMetadata);
+ }
+
+ }
+
+ if (taskBlocks != null && taskBlocks.size() > 0) {
+ LOG.log(Level.FINER, "Loading: [" + taskBlocks.size() + "] tasks from: ["
+ + rootElem.getOwnerDocument().getDocumentURI() + "]");
+ for (Element taskElem : taskBlocks) {
+ loadGraphs(rootElements, taskElem, new Graph(), staticMetadata);
+ }
+ }
+ }
+
+ private void loadGraphs(List<Element> rootElements, Element graphElem,
+ Graph parent, Metadata staticMetadata) throws Exception {
+
+ LOG.log(Level.FINEST, "Visiting node: [" + graphElem.getNodeName() + "]");
+ loadConfiguration(rootElements, graphElem, staticMetadata);
+ Graph graph = !graphElem.getNodeName().equals("cas:workflows") ? new Graph(
+ graphElem, staticMetadata) : new Graph();
+ parent.getChildren().add(graph);
+ graph.setParent(parent);
+ if (!graphElem.getNodeName().equals("cas:workflows")) {
+ expandWorkflowTasksAndConditions(graph, staticMetadata);
+ }
+
+ for (String processorType : this.processorIds) {
+ LOG.log(Level.FINE, "Scanning for: [" + processorType + "] nodes");
+ List<Element> procTypeBlocks = this.getChildrenByTagName(graphElem,
+ processorType);
+ if (procTypeBlocks != null && procTypeBlocks.size() > 0) {
+ LOG.log(Level.FINE, "Found: [" + procTypeBlocks.size() + "] ["
+ + processorType + "] processor types");
+ for (int i = 0; i < procTypeBlocks.size(); i++) {
+ loadGraphs(rootElements, procTypeBlocks.get(i), graph, staticMetadata);
+ }
+ } else {
+ if (processorType.equals("condition")) {
+ NodeList procTypeBlockNodes = graphElem
+ .getElementsByTagName("condition");
+ if (procTypeBlockNodes != null && procTypeBlockNodes.getLength() > 0) {
+ LOG.log(Level.FINE, "Found: [" + procTypeBlockNodes.getLength()
+ + "] linked condition definitions");
+ for (int i = 0; i < procTypeBlockNodes.getLength(); i++) {
+ loadGraphs(rootElements, (Element) procTypeBlockNodes.item(i),
+ graph, staticMetadata);
+ }
+ }
+ }
+ }
+ }
+
+ if (graphElem.getNodeName().equals("cas:workflows"))
+ return;
+ }
+
+ private void loadConfiguration(List<Element> rootElements, Node workflowNode,
+ Metadata staticMetadata) throws Exception {
+ NodeList children = workflowNode.getChildNodes();
+ for (int i = 0; i < children.getLength(); i++) {
+ Node curChild = children.item(i);
+
+ if (curChild.getNodeName().equals("configuration")) {
+ Metadata curMetadata = new Metadata();
+ if (!((Element) curChild).getAttribute("extends").equals(""))
+ for (String extension : ((Element) curChild).getAttribute("extends")
+ .split(","))
+ curMetadata
+ .replaceMetadata(globalConfGroups.containsKey(extension) ? globalConfGroups
+ .get(extension) : this.loadConfGroup(rootElements,
+ extension, globalConfGroups));
+ curMetadata.replaceMetadata(XmlStructFactory
+ .getConfigurationAsMetadata(curChild));
+ NamedNodeMap attrMap = curChild.getAttributes();
+ String configName = null;
+ for (int j = 0; j < attrMap.getLength(); j++) {
+ Attr attr = (Attr) attrMap.item(j);
+ if (attr.getName().equals("name")) {
+ configName = attr.getValue();
+ }
+ }
+
+ if (configName == null || (configName != null && configName.equals(""))) {
+ NamedNodeMap workflowNodeAttrs = workflowNode.getAttributes();
+ for (int j = 0; j < workflowNodeAttrs.getLength(); j++) {
+ Attr attr = (Attr) workflowNodeAttrs.item(j);
+ if (attr.getName().equals("id")) {
+ configName = attr.getValue();
+ }
+ }
+ }
+
+ this.globalConfGroups.put(configName, curMetadata);
+ staticMetadata.replaceMetadata(curMetadata);
+ }
+ }
+ }
+
+ private Metadata loadConfGroup(List<Element> rootElements, String group,
+ Map<String, Metadata> globalConfGroups) throws Exception {
+ for (final Element rootElement : rootElements) {
+ NodeList nodes = rootElement.getElementsByTagName("configuration");
+ for (int i = 0; i < nodes.getLength(); i++) {
+ Node node = nodes.item(i);
+ String name = ((Element) node).getAttribute("name");
+ if (name.equals(group))
+ return XmlStructFactory.getConfigurationAsMetadata(node);
+ }
+ }
+ throw new Exception("Configuration group '" + group + "' not defined!");
+ }
+
+ private void expandWorkflowTasksAndConditions(Graph graph,
+ Metadata staticMetadata) throws Exception {
+ if (graph.getExecutionType().equals("workflow")
+ || graph.getExecutionType().equals("sequential")
+ || graph.getExecutionType().equals("parallel")) {
+ ParentChildWorkflow workflow = new ParentChildWorkflow(graph);
+ workflow.setId(graph.getModelId());
+ workflow.setName(graph.getModelName());
+ graph.setWorkflow(workflow);
+ if (graph.getParent() == null
+ || (graph.getParent() != null && graph.getParent().getWorkflow() == null)) {
+ LOG.log(Level.FINEST, "Workflow: [" + graph.getModelId()
+ + "] has no parent: it's a top-level workflow");
+ }
+
+ if (workflow.getName() == null
+ || (workflow.getName() != null && workflow.getName().equals(""))) {
+ workflow.setName(graph.getExecutionType() + "-" + workflow.getId());
+ }
+ this.workflows.put(graph.getModelId(), workflow);
+ } else if (graph.getExecutionType().equals("condition")) {
+ WorkflowCondition cond = null;
+
+ if (graph.getModelIdRef() != null && !graph.getModelIdRef().equals("")) {
+ cond = this.conditions.get(graph.getModelIdRef());
+ } else {
+ cond = new WorkflowCondition();
+ cond.setConditionId(graph.getModelId());
+ cond.setConditionName(graph.getModelName());
+ cond.setConditionInstanceClassName(graph.getClazz());
+ cond.setCondConfig(convertToConditionConfiguration(staticMetadata));
+
+ if (cond.getConditionName() == null
+ || (cond.getConditionName() != null && cond.getConditionName()
+ .equals(""))) {
+ cond.setConditionName(cond.getConditionId());
+ }
+ this.conditions.put(graph.getModelId(), cond);
+
+ }
+
+ graph.setCond(cond);
+ if (graph.getParent() != null) {
+ if (graph.getParent().getWorkflow() != null) {
+ LOG.log(
+ Level.WARNING,
+ "wengine feature not supported yet: condition: ["
+ + cond.getConditionId() + "] has workflow parent: ["
+ + graph.getParent().getWorkflow().getId() + "]");
+ } else if (graph.getParent().getTask() != null) {
+ graph.getParent().getTask().getConditions().add(cond);
+ } else {
+ LOG.log(Level.FINEST, "Condition: [" + graph.getModelId()
+ + "] has not parent: it's a condition definition");
+ }
+ } else {
+ LOG.log(Level.FINEST, "Condition: [" + graph.getModelId()
+ + "]: parent is null");
+ }
+ // if parent doesn't have task or workflow set, then its parent
+ // is null and it's a condition definition, just add it
+
+ } else if (graph.getExecutionType().equals("task")) {
+ WorkflowTask task = null;
+ if (graph.getModelIdRef() != null && !graph.getModelIdRef().equals("")) {
+ LOG.log(Level.FINER, "Model ID-Ref to: [" + graph.getModelIdRef() + "]");
+ task = this.tasks.get(graph.getModelIdRef());
+ } else {
+ task = new WorkflowTask();
+ task.setTaskId(graph.getModelId());
+ task.setTaskName(graph.getModelName());
+ task.setTaskConfig(convertToTaskConfiguration(staticMetadata));
+ task.setTaskInstanceClassName(graph.getClazz());
+
+ if (task.getTaskName() == null
+ || (task.getTaskName() != null && task.getTaskName().equals(""))) {
+ task.setTaskName(task.getTaskId());
+ }
+ this.tasks.put(graph.getModelId(), task);
+ }
+
+ graph.setTask(task);
+ if (graph.getParent() != null) {
+ if (graph.getParent().getWorkflow() != null) {
+ graph.getParent().getWorkflow().getTasks().add(task);
+ } else {
+ LOG.log(Level.FINEST, "Task: [" + graph.getModelId()
+ + "] has no parent: it's a task definition");
+ }
+ } else {
+ LOG.log(Level.FINEST, "Task: [" + graph.getModelId()
+ + "]: parent is null");
+ }
+ }
+
+ }
+
+ private ParentChildWorkflow getDynamicWorkflow(WorkflowTask task) {
+ ParentChildWorkflow workflow = new ParentChildWorkflow(new Graph());
+ workflow.setId("parallel-" + UUID.randomUUID().toString());
+ workflow.setName("Parallel Single Task " + task.getTaskName());
+ workflow.getTasks().add(task);
+ this.workflows.put(workflow.getId(), workflow);
+ return workflow;
+ }
+
+ private WorkflowTask generateRedirector(String eventName) {
+ WorkflowTask task = new WorkflowTask();
+ WorkflowTaskConfiguration config = new WorkflowTaskConfiguration();
+ config.addConfigProperty("eventName", eventName);
+ task.setTaskId("redirector-" + UUID.randomUUID().toString());
+ task.setTaskName("Redirector Task");
+ task.setTaskInstanceClassName(BranchRedirector.class.getName());
+ this.tasks.put(task.getTaskId(), task);
+ return task;
+ }
+
+ private WorkflowTaskConfiguration convertToTaskConfiguration(Metadata met) {
+ WorkflowTaskConfiguration config = new WorkflowTaskConfiguration();
+ for (String key : met.getAllKeys()) {
+ config.addConfigProperty(key, met.getMetadata(key));
+ }
+ return config;
+ }
+
+ private WorkflowConditionConfiguration convertToConditionConfiguration(
+ Metadata met) {
+ WorkflowConditionConfiguration config = new WorkflowConditionConfiguration();
+ for (String key : met.getAllKeys()) {
+ config.addConfigProperty(key, met.getMetadata(key));
+ }
+ return config;
+ }
+
+ private boolean checkValue(String value) {
+ return value != null && !value.equals("");
+ }
+
+ /**
+ * Taken from: http://stackoverflow.com/questions/1241525/java-element-
+ * getelementsbytagname-restrict-to-top-level
+ */
+ private List<Element> getChildrenByTagName(Element parent, String name) {
+ List<Element> nodeList = new Vector<Element>();
+ for (Node child = parent.getFirstChild(); child != null; child = child
+ .getNextSibling()) {
+ if (child.getNodeType() == Node.ELEMENT_NODE
+ && name.equals(child.getNodeName())) {
+ nodeList.add((Element) child);
+ }
+ }
+
+ return nodeList;
+ }
+
+ private class Graph {
+
+ private WorkflowTask task;
+
+ private WorkflowCondition cond;
+
+ private ParentChildWorkflow workflow;
+
+ private String modelIdRef;
+
+ private String modelId;
+
+ private String modelName;
+
+ private String alias;
+
+ private String executionType;
+
+ private String minReqSuccessfulSubProcessors;
+
+ private List<String> excused;
+
+ private String clazz;
+
+ private Graph parent;
+
+ private List<Graph> children;
+
+ public Graph(Element graphElem, Metadata staticMetadata) throws Exception {
+ this();
+ this.modelId = graphElem.getAttribute("id");
+ this.modelName = graphElem.getAttribute("name");
+ this.clazz = graphElem.getAttribute("class");
+ this.modelIdRef = graphElem.getAttribute("id-ref");
+ this.excused.addAll(Arrays.asList(graphElem.getAttribute("excused")
+ .split(",")));
+ this.alias = graphElem.getAttribute("alias");
+ this.minReqSuccessfulSubProcessors = graphElem.getAttribute("min");
+ this.executionType = graphElem.getAttribute("execution");
+
+ NamedNodeMap attrMap = graphElem.getAttributes();
+ for (int i = 0; i < attrMap.getLength(); i++) {
+ Attr attr = (Attr) attrMap.item(i);
+ if (attr.getName().startsWith("p:")) {
+ staticMetadata.replaceMetadata(attr.getName().substring(2),
+ attr.getValue());
+ }
+ }
+
+ if ((graphElem.getNodeName().equals("workflow") || graphElem
+ .getNodeName().equals("conditions")) && this.executionType == null) {
+ throw new Exception("workflow model '" + graphElem.getNodeName()
+ + "' missing execution type");
+ } else {
+ this.executionType = graphElem.getNodeName();
+ }
+
+ if (!processorIds.contains(this.executionType))
+ throw new Exception("Unsupported execution type id '"
+ + this.executionType + "'");
+
+ if (!checkValue(this.modelId) && !checkValue(this.modelIdRef)) {
+ this.modelId = UUID.randomUUID().toString();
+ }
+
+ if (this.alias != null && !this.alias.equals("")) {
+ this.modelId = this.alias;
+ }
+ }
+
+ public Graph() {
+ this.task = null;
+ this.cond = null;
+ this.workflow = null;
+ this.modelIdRef = null;
+ this.modelId = null;
+ this.modelName = null;
+ this.alias = null;
+ this.executionType = null;
+ this.minReqSuccessfulSubProcessors = null;
+ this.excused = new Vector<String>();
+ this.clazz = null;
+ this.children = new Vector<Graph>();
+ this.parent = null;
+ }
+
+ /**
+ * @return the parent
+ */
+ public Graph getParent() {
+ return parent;
+ }
+
+ /**
+ * @param parent
+ * the parent to set
+ */
+ public void setParent(Graph parent) {
+ this.parent = parent;
+ }
+
+ /**
+ * @return the children
+ */
+ public List<Graph> getChildren() {
+ return children;
+ }
+
+ /**
+ * @param children
+ * the children to set
+ */
+ public void setChildren(List<Graph> children) {
+ this.children = children;
+ }
+
+ /**
+ * @return the modelIdRef
+ */
+ public String getModelIdRef() {
+ return modelIdRef;
+ }
+
+ /**
+ * @param modelIdRef
+ * the modelIdRef to set
+ */
+ public void setModelIdRef(String modelIdRef) {
+ this.modelIdRef = modelIdRef;
+ }
+
+ /**
+ * @return the modelId
+ */
+ public String getModelId() {
+ return modelId;
+ }
+
+ /**
+ * @param modelId
+ * the modelId to set
+ */
+ public void setModelId(String modelId) {
+ this.modelId = modelId;
+ }
+
+ /**
+ * @return the modelName
+ */
+ public String getModelName() {
+ return modelName;
+ }
+
+ /**
+ * @param modelName
+ * the modelName to set
+ */
+ public void setModelName(String modelName) {
+ this.modelName = modelName;
+ }
+
+ /**
+ * @return the alias
+ */
+ public String getAlias() {
+ return alias;
+ }
+
+ /**
+ * @param alias
+ * the alias to set
+ */
+ public void setAlias(String alias) {
+ this.alias = alias;
+ }
+
+ /**
+ * @return the executionType
+ */
+ public String getExecutionType() {
+ return executionType;
+ }
+
+ /**
+ * @param executionType
+ * the executionType to set
+ */
+ public void setExecutionType(String executionType) {
+ this.executionType = executionType;
+ }
+
+ /**
+ * @return the minReqSuccessfulSubProcessors
+ */
+ public String getMinReqSuccessfulSubProcessors() {
+ return minReqSuccessfulSubProcessors;
+ }
+
+ /**
+ * @param minReqSuccessfulSubProcessors
+ * the minReqSuccessfulSubProcessors to set
+ */
+ public void setMinReqSuccessfulSubProcessors(
+ String minReqSuccessfulSubProcessors) {
+ this.minReqSuccessfulSubProcessors = minReqSuccessfulSubProcessors;
+ }
+
+ /**
+ * @return the excused
+ */
+ public List<String> getExcused() {
+ return excused;
+ }
+
+ /**
+ * @param excused
+ * the excused to set
+ */
+ public void setExcused(List<String> excused) {
+ this.excused = excused;
+ }
+
+ /**
+ * @return the clazz
+ */
+ public String getClazz() {
+ return clazz;
+ }
+
+ /**
+ * @param clazz
+ * the clazz to set
+ */
+ public void setClazz(String clazz) {
+ this.clazz = clazz;
+ }
+
+ /**
+ * @return the task
+ */
+ public WorkflowTask getTask() {
+ return task;
+ }
+
+ /**
+ * @param task
+ * the task to set
+ */
+ public void setTask(WorkflowTask task) {
+ this.task = task;
+ }
+
+ /**
+ * @return the cond
+ */
+ public WorkflowCondition getCond() {
+ return cond;
+ }
+
+ /**
+ * @param cond
+ * the cond to set
+ */
+ public void setCond(WorkflowCondition cond) {
+ this.cond = cond;
+ }
+
+ /**
+ * @return the workflow
+ */
+ public ParentChildWorkflow getWorkflow() {
+ return workflow;
+ }
+
+ /**
+ * @param workflow
+ * the workflow to set
+ */
+ public void setWorkflow(ParentChildWorkflow workflow) {
+ this.workflow = workflow;
+ }
+
+ public String toString() {
+ return this.modelId;
+ }
+
+ }
+
+ private class ParentChildWorkflow extends Workflow {
+
+ private Graph graph;
+
+ public ParentChildWorkflow(Graph graph) {
+ this.graph = graph;
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer buf = new StringBuffer("[workflow id=");
+ buf.append(this.getId());
+ buf.append(",name=");
+ buf.append(this.getName());
+ buf.append(",parent=");
+ buf.append(this.graph.parent.modelId);
+ buf.append(",children=");
+ buf.append(this.graph.children);
+ buf.append(",executionType=");
+ buf.append(this.graph.executionType);
+ buf.append(",tasks=");
+ for (WorkflowTask task : (List<WorkflowTask>) this.getTasks()) {
+ buf.append("[task name=");
+ buf.append(task.getTaskName());
+ buf.append(",id=");
+ buf.append(task.getTaskId());
+ buf.append(",instanceClass=");
+ buf.append(task.getTaskInstanceClassName());
+ buf.append(",requiredMet=");
+ buf.append(task.getRequiredMetFields());
+
+ buf.append(",conditions=");
+ for (WorkflowCondition cond : (List<WorkflowCondition>) task
+ .getConditions()) {
+ buf.append("[condition name=");
+ buf.append(cond.getConditionName());
+ buf.append(",id=");
+ buf.append(cond.getConditionId());
+ buf.append(",instanceClass=");
+ buf.append(cond.getConditionInstanceClassName());
+ buf.append(",config=");
+ buf.append(cond.getCondConfig().getProperties());
+ buf.append("]");
+ }
+
+ buf.append("]");
+ }
+
+ return buf.toString();
+ }
+
+ /**
+ * @return the graph
+ */
+ public Graph getGraph() {
+ return graph;
+ }
+
+ /**
+ * @param graph
+ * the graph to set
+ */
+ public void setGraph(Graph graph) {
+ this.graph = graph;
+ }
+
+ }
+
+ private class BranchRedirector implements WorkflowTaskInstance {
+
+ public BranchRedirector() {
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance#run(org.apache
+ * .oodt.cas.metadata.Metadata,
+ * org.apache.oodt.cas.workflow.structs.WorkflowTaskConfiguration)
+ */
+ @Override
+ public void run(Metadata metadata, WorkflowTaskConfiguration config)
+ throws WorkflowTaskInstanceException {
+ XmlRpcWorkflowManagerClient wm = null;
+
+ try {
+ wm = new XmlRpcWorkflowManagerClient(new URL(
+ metadata.getMetadata(CoreMetKeys.WORKFLOW_MANAGER_URL)));
+ wm.sendEvent(config.getProperty("eventName"), metadata);
+ } catch (Exception e) {
+ throw new WorkflowTaskInstanceException(e.getMessage());
+ }
+ }
+
+ }
+
+}
Added: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/repository/PackagedWorkflowRepositoryFactory.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/repository/PackagedWorkflowRepositoryFactory.java?rev=1142815&view=auto
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/repository/PackagedWorkflowRepositoryFactory.java (added)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/repository/PackagedWorkflowRepositoryFactory.java Mon Jul 4 21:25:58 2011
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.repository;
+
+//JDK imports
+import java.io.File;
+import java.util.Arrays;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ *
+ * Constructs {@link PackagedWorkflowRepository}s.
+ *
+ * @author mattmann
+ * @version $Revision$
+ *
+ */
+public class PackagedWorkflowRepositoryFactory implements
+ WorkflowRepositoryFactory {
+
+ private String wDirPath;
+
+ private static final Logger LOG = Logger
+ .getLogger(PackagedWorkflowRepositoryFactory.class.getName());
+
+ public PackagedWorkflowRepositoryFactory() throws InstantiationException {
+ this.wDirPath = System
+ .getProperty("org.apache.oodt.cas.workflow.wengine.packagedRepo.dir.path");
+ if (this.wDirPath == null
+ || (this.wDirPath != null && !new File(wDirPath).isDirectory())) {
+ throw new InstantiationException("Must specify valid directory path "
+ + "containing wengine-style workflow xml files! path specified: ["
+ + this.wDirPath + "]");
+ }
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oodt.cas.workflow.repository.WorkflowRepositoryFactory#
+ * createRepository()
+ */
+ @Override
+ public WorkflowRepository createRepository() {
+ try {
+ return new PackagedWorkflowRepository(
+ Arrays.asList(new File(this.wDirPath).listFiles()));
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOG.log(
+ Level.SEVERE,
+ "Unable to create packaged workflow repository! Reason: "
+ + e.getMessage());
+ return null;
+ }
+ }
+
+}
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/util/XmlStructFactory.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/util/XmlStructFactory.java?rev=1142815&r1=1142814&r2=1142815&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/util/XmlStructFactory.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/util/XmlStructFactory.java Mon Jul 4 21:25:58 2011
@@ -19,6 +19,7 @@
package org.apache.oodt.cas.workflow.util;
//JDK imports
+import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Vector;
@@ -30,6 +31,7 @@ import org.w3c.dom.NodeList;
import org.w3c.dom.Element;
//OODT imports
+import org.apache.oodt.cas.metadata.Metadata;
import org.apache.oodt.cas.metadata.util.PathUtils;
import org.apache.oodt.cas.workflow.structs.Workflow;
import org.apache.oodt.cas.workflow.structs.WorkflowConditionConfiguration;
@@ -272,6 +274,30 @@ public final class XmlStructFactory {
return properties;
}
+
+ public static Metadata getConfigurationAsMetadata(Node configNode)
+ throws Exception {
+ Metadata curMetadata = new Metadata();
+ NodeList curGrandChildren = configNode.getChildNodes();
+ for (int k = 0; k < curGrandChildren.getLength(); k++) {
+ if (curGrandChildren.item(k).getNodeName().equals("property")) {
+ Element property = (Element) curGrandChildren.item(k);
+ String delim = property.getAttribute("delim");
+ String envReplace = property.getAttribute("envReplace");
+ String name = property.getAttribute("name");
+ String value = property.getAttribute("value");
+ if (Boolean.parseBoolean(envReplace))
+ value = PathUtils.doDynamicReplacement(value);
+ List<String> values = new Vector<String>();
+ if (delim.length() > 0)
+ values.addAll(Arrays.asList(value.split("\\" + delim)));
+ else
+ values.add(value);
+ curMetadata.replaceMetadata(name, values);
+ }
+ }
+ return curMetadata;
+ }
private static Element getFirstElement(String name, Element root) {
NodeList list = root.getElementsByTagName(name);
Added: oodt/trunk/workflow/src/main/resources/examples/wengine/GranuleMaps.xml
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/resources/examples/wengine/GranuleMaps.xml?rev=1142815&view=auto
==============================================================================
--- oodt/trunk/workflow/src/main/resources/examples/wengine/GranuleMaps.xml (added)
+++ oodt/trunk/workflow/src/main/resources/examples/wengine/GranuleMaps.xml Mon Jul 4 21:25:58 2011
@@ -0,0 +1,247 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<cas:workflows
+ xmlns="http://oodt.jpl.nasa.gov/2.0/cas"
+ xmlns:cas="http://oodt.jpl.nasa.gov/2.0/cas"
+ xmlns:p="http://oodt.jpl.nasa.gov/2.0/cas/property">
+
+ <sequential id="urn:npp:GranuleMaps">
+ <configuration>
+
+ <!-- PCS properties -->
+ <property name="PGETask/Query/FileManagerUrl" value="[FILEMGR_URL]" envReplace="true"/>
+ <property name="PGETask/Ingest/FileManagerUrl" value="[FILEMGR_URL]" envReplace="true"/>
+ <property name="PGETask/Ingest/ClientTransferServiceFactory" value="org.apache.oodt.cas.filemgr.datatransfer.LocalDataTransferFactory"/>
+ <property name="PGETask/Ingest/MetFileExtension" value="cas"/>
+ <property name="PGETask/Ingest/CrawlerCrawlForDirs" value="false"/>
+ <property name="PGETask/Ingest/CrawlerRecur" value="false"/>
+ <property name="PGETask/Ingest/ActionsIds" value="RmDataFile"/>
+ <property name="PGETask/Ingest/ActionRepoFile" value="file:[PCS_HOME]/core/pge/policy/action-beans.xml" envReplace="true"/>
+
+ <!-- Timeout properties -->
+ <property name="PGETask/Condition/Timeout" value="30000"/>
+
+ <!-- metadata element names -->
+ <property name="PGETask/Condition/FilenameKey" value="Filename"/>
+ <property name="PGETask/Condition/StartDateTimeKey" value="StartDateTime"/>
+ <property name="PGETask/Condition/EndDateTimeKey" value="EndDateTime"/>
+ <property name="PGETask/Condition/SortByKey" value="StartDateTime"/>
+ <property name="PGETask/Condition/VersioningKey" value="ProductionDateTime"/>
+
+ <!-- query properties -->
+ <property name="PGETask/Condition/FilterAlgorClass" value="org.apache.oodt.cas.filemgr.structs.query.filter.WeightedHashFilterAlgor"/>
+ <property name="PGETask/Condition/MinNumOfFiles" value="1"/>
+ <property name="PGETask/Condition/MaxGapSize" value="-1"/>
+ <property name="PGETask/Condition/EpsilonInMillis" value="0"/>
+
+ <!-- formatting properties for queries -->
+ <property name="PGETask/Condition/ResultFormat" value="$FileLocation/$Filename"/>
+
+ <!-- Wait time between block and unblock in minutes -->
+ <property name="BlockTimeElapse" value="1"/>
+
+ </configuration>
+
+ <conditions execution="parallel" type="pre">
+ <condition id-ref="urn:npp:MOA_IASI_L1C_Daily"/>
+ <condition id-ref="urn:npp:MOA_MHS_L1B_Daily"/>
+ <condition id-ref="urn:npp:MOA_AMSUA_L1B_Daily"/>
+ </conditions>
+
+ <parallel>
+ <task id-ref="urn:npp:Orbits"/>
+ <task id-ref="urn:npp:MoaIasiGPolygon"/>
+ <task id-ref="urn:npp:MoaMhsGPolygon"/>
+ <task id-ref="urn:npp:MoaAmsuaGPolygon"/>
+ </parallel>
+
+ <parallel>
+ <conditions execution="sequential" type="pre">
+ <condition id-ref="urn:npp:MOA_ORBITS_FileBased"/>
+ </conditions>
+
+ <task id-ref="urn:npp:MoaIasiMap"/>
+ <task id-ref="urn:npp:MoaMhsMap"/>
+ <task id-ref="urn:npp:MoaAmsuaMap"/>
+
+ <!--conditions execution="sequential" type="post">
+ <condition id-ref="urn:npp:VerifyMapsExist" name="VerifyMapsExist"/>
+ </conditions-->
+
+ </parallel>
+
+ </sequential>
+
+
+ <!-- CONDITIONS -->
+ <condition id="urn:npp:MOA_IASI_L1C_Daily" name="MOA_IASI_L1C_Daily" class="org.apache.oodt.cas.pge.condition.PGETaskWorkflowCondition">
+ <configuration>
+ <!-- query properties -->
+ <property name="PGETask/Condition/ProductTypeNames" value="MOA_IASI_L1C"/>
+ <property name="PGETask/Condition/ExpectedNumOfFiles" value="480"/>
+ <property name="PGETask/Condition/EpsilonInMillis" value="30000"/>
+ <!-- formatting properties for queries -->
+ <property name="PGETask/Condition/FormattedSqlQueryKey" value="Runtime/Condition/IASI_L1FormattedQuery"/>
+ <property name="PGETask/Condition/FilenamesSqlQueryKey" value="Runtime/Condition/IASI_L1FilenamesQuery"/>
+ <property name="QueueName" value="java"/>
+ </configuration>
+ </condition>
+ <condition id="urn:npp:MOA_MHS_L1B_Daily" name="MOA_MHS_L1B_Daily" class="org.apache.oodt.cas.pge.condition.PGETaskWorkflowCondition">
+ <configuration>
+ <!-- query properties -->
+ <property name="PGETask/Condition/ProductTypeNames" value="MOA_MHS_L1B"/>
+ <property name="PGETask/Condition/ExpectedNumOfFiles" value="15"/>
+ <property name="PGETask/Condition/EpsilonInMillis" value="60000"/> <!--2mins-->
+ <!-- formatting properties for queries -->
+ <property name="PGETask/Condition/FormattedSqlQueryKey" value="Runtime/Condition/MHS_L1FormattedQuery"/>
+ <property name="PGETask/Condition/FilenamesSqlQueryKey" value="Runtime/Condition/MHS_L1FilenamesQuery"/>
+ <property name="QueueName" value="java"/>
+ </configuration>
+ </condition>
+ <condition id="urn:npp:MOA_AMSUA_L1B_Daily" name="MOA_MHS_L1B_Daily" class="org.apache.oodt.cas.pge.condition.PGETaskWorkflowCondition">
+ <configuration>
+ <!-- query properties -->
+ <property name="PGETask/Condition/ProductTypeNames" value="MOA_AMSUA_L1B"/>
+ <property name="PGETask/Condition/ExpectedNumOfFiles" value="15"/>
+ <property name="PGETask/Condition/EpsilonInMillis" value="60000"/> <!--2mins-->
+ <!-- formatting properties for queries -->
+ <property name="PGETask/Condition/FormattedSqlQueryKey" value="Runtime/Condition/AMSUA_L1FormattedQuery"/>
+ <property name="PGETask/Condition/FilenamesSqlQueryKey" value="Runtime/Condition/AMSUA_L1FilenamesQuery"/>
+ <property name="QueueName" value="java"/>
+ </configuration>
+ </condition>
+ <condition id="urn:npp:MOA_ORBITS_FileBased" name="MOA_ORBITS_FileBased" class="org.apache.oodt.cas.pge.condition.PGETaskWorkflowCondition">
+ <configuration>
+ <!-- query properties -->
+ <property name="PGETask/Condition/ProductTypeNames" value="MOA_ORBITS"/>
+ <property name="PGETask/Condition/ExpectedNumOfFiles" value="1"/>
+ <!-- formatting properties for queries -->
+ <property name="PGETask/Condition/FormattedSqlQueryKey" value="Runtime/Condition/OrbitsFormattedQuery"/>
+ <property name="PGETask/Condition/FilenamesSqlQueryKey" value="Runtime/Condition/OrbitsFilenamesQuery"/>
+ <property name="QueueName" value="java"/>
+ </configuration>
+ </condition>
+ <condition id="urn:npp:MOA_IASI_POLY_FileBased" name="MOA_IASI_POLY_FileBased" class="org.apache.oodt.cas.pge.condition.PGETaskWorkflowCondition">
+ <configuration>
+ <!-- query properties -->
+ <property name="PGETask/Condition/ProductTypeNames" value="MOA_IASI_POLY"/>
+ <property name="PGETask/Condition/ExpectedNumOfFiles" value="1"/>
+ <!-- formatting properties for queries -->
+ <property name="PGETask/Condition/FormattedSqlQueryKey" value="Runtime/Condition/IASI_GPolyFormattedQuery"/>
+ <property name="PGETask/Condition/FilenamesSqlQueryKey" value="Runtime/Condition/IASI_GPolyFilenamesQuery"/>
+ <property name="QueueName" value="java"/>
+ </configuration>
+ </condition>
+ <condition id="urn:npp:MOA_MHS_POLY_FileBased" name="MOA_MHS_POLY_FileBased" class="org.apache.oodt.cas.pge.condition.PGETaskWorkflowCondition">
+ <configuration>
+ <!-- query properties -->
+ <property name="PGETask/Condition/ProductTypeNames" value="MOA_MHS_POLY"/>
+ <property name="PGETask/Condition/ExpectedNumOfFiles" value="1"/>
+ <!-- formatting properties for queries -->
+ <property name="PGETask/Condition/FormattedSqlQueryKey" value="Runtime/Condition/MHS_GPolyFormattedQuery"/>
+ <property name="PGETask/Condition/FilenamesSqlQueryKey" value="Runtime/Condition/MHS_GPolyFilenamesQuery"/>
+ <property name="QueueName" value="java"/>
+ </configuration>
+ </condition>
+ <condition id="urn:npp:MOA_AMSUA_POLY_FileBased" name="MOA_AMSUA_POLY_FileBased" class="org.apache.oodt.cas.pge.condition.PGETaskWorkflowCondition">
+ <configuration>
+ <!-- query properties -->
+ <property name="PGETask/Condition/ProductTypeNames" value="MOA_AMSUA_POLY"/>
+ <property name="PGETask/Condition/ExpectedNumOfFiles" value="1"/>
+ <!-- formatting properties for queries -->
+ <property name="PGETask/Condition/FormattedSqlQueryKey" value="Runtime/Condition/AMSUA_GPolyFormattedQuery"/>
+ <property name="PGETask/Condition/FilenamesSqlQueryKey" value="Runtime/Condition/AMSUA_GPolyFilenamesQuery"/>
+ <property name="QueueName" value="java"/>
+ </configuration>
+ </condition>
+ <!--condition id="urn:npp:VerifyMapsExist" name="VerifyMapsExist" class="org.apache.oodt.cas.pge.condition.PGETaskWorkflowCondition"/-->
+
+
+ <!-- TASKS -->
+ <task id="urn:npp:Orbits" name="MoaOrbits" class="org.apache.oodt.cas.pge.StdPGETaskInstance">
+ <configuration>
+ <property name="PGETask/Name" value="MoaOrbits" />
+ <property name="PGETask/ConfigFilePath" value="[SPSS_CONFIG_HOME]/[MoaOrbits/SPSS_Version]/config/PgeConfig_MoaOrbits.xml"/>
+ <property name="Runtime/Condition/L1FormattedQuery" value="[Runtime/Condition/IASI_L1FormattedQuery]"/>
+ <property name="Runtime/Condition/L1FilenamesQuery" value="[Runtime/Condition/IASI_L1FilenamesQuery]"/>
+ <property name="Runtime/Condition/SharedMetout" value="[SPSS_CONFIG_HOME]/[MoaOrbits/SPSS_Version]/metout/AllProducts_metadata.xml,[SPSS_CONFIG_HOME]/[MoaOrbits/SPSS_Version]/metout/DailyProduct_metadata.xml"/>
+ <property name="Instrument" value="MetOpS"/>
+ <property name="QueueName" value="idl"/>
+ </configuration>
+ </task>
+ <task id="urn:npp:MoaIasiGPolygon" name="MoaIasiGPolygon" class="org.apache.oodt.cas.pge.StdPGETaskInstance">
+ <configuration>
+ <property name="PGETask/Name" value="MoaIasiGPolygon"/>
+ <property name="PGETask/ConfigFilePath" value="[SPSS_CONFIG_HOME]/[MoaIasiGPolygon/SPSS_Version]/config/PgeConfig_MoaGPolygon.xml"/>
+ <property name="Runtime/Condition/L1FormattedQuery" value="[Runtime/Condition/IASI_L1FormattedQuery]"/>
+ <property name="Runtime/Condition/L1FilenamesQuery" value="[Runtime/Condition/IASI_L1FilenamesQuery]"/>
+ <property name="Runtime/Condition/SharedMetout" value="[SPSS_CONFIG_HOME]/[MoaIasiGPolygon/SPSS_Version]/metout/AllProducts_metadata.xml,[SPSS_CONFIG_HOME]/[MoaIasiGPolygon/SPSS_Version]/metout/DailyProduct_metadata.xml"/>
+ <property name="Instrument" value="IASI"/>
+ <property name="QueueName" value="idl"/>
+ </configuration>
+ </task>
+ <task id="urn:npp:MoaMhsGPolygon" name="MoaMhsGPolygon" class="org.apache.oodt.cas.pge.StdPGETaskInstance">
+ <configuration>
+ <property name="PGETask/Name" value="MoaAmsuaGPolygon"/>
+ <property name="PGETask/ConfigFilePath" value="[SPSS_CONFIG_HOME]/[MoaAmsuaGPolygon/SPSS_Version]/config/PgeConfig_MoaGPolygon.xml"/>
+ <property name="Runtime/Condition/L1FormattedQuery" value="[Runtime/Condition/AMSUA_L1FormattedQuery]"/>
+ <property name="Runtime/Condition/L1FilenamesQuery" value="[Runtime/Condition/AMSUA_L1FilenamesQuery]"/>
+ <property name="Runtime/Condition/SharedMetout" value="[SPSS_CONFIG_HOME]/[MoaAmsuaGPolygon/SPSS_Version]/metout/AllProducts_metadata.xml,[SPSS_CONFIG_HOME]/[MoaAmsuaGPolygon/SPSS_Version]/metout/DailyProduct_metadata.xml"/>
+ <property name="Instrument" value="AMSUA"/>
+ <property name="QueueName" value="idl"/>
+ </configuration>
+ </task>
+ <task id="urn:npp:MoaAmsuaGPolygon" name="MoaAmsuaGPolygon" class="org.apache.oodt.cas.pge.StdPGETaskInstance">
+ <configuration>
+ <property name="PGETask/Name" value="MoaMhsGPolygon"/>
+ <property name="PGETask/ConfigFilePath" value="[SPSS_CONFIG_HOME]/[MoaMhsGPolygon/SPSS_Version]/config/PgeConfig_MoaGPolygon.xml"/>
+ <property name="Runtime/Condition/L1FormattedQuery" value="[Runtime/Condition/MHS_L1FormattedQuery]"/>
+ <property name="Runtime/Condition/L1FilenamesQuery" value="[Runtime/Condition/MHS_L1FilenamesQuery]"/>
+ <property name="Runtime/Condition/SharedMetout" value="[SPSS_CONFIG_HOME]/[MoaMhsGPolygon/SPSS_Version]/metout/AllProducts_metadata.xml,[SPSS_CONFIG_HOME]/[MoaMhsGPolygon/SPSS_Version]/metout/DailyProduct_metadata.xml"/>
+ <property name="Instrument" value="MHS"/>
+ <property name="QueueName" value="idl"/>
+ </configuration>
+ </task>
+ <task id="urn:npp:MoaIasiMap" name="MoaIasiMap" class="org.apache.oodt.cas.pge.StdPGETaskInstance">
+ <conditions execution="sequential" type="pre">
+ <condition id-ref="urn:npp:MOA_IASI_POLY_FileBased"/>
+ </conditions>
+ <configuration>
+ <property name="PGETask/Name" value="MoaIasiMap"/>
+ <property name="PGETask/ConfigFilePath" value="[SPSS_CONFIG_HOME]/[MoaIasiMap/SPSS_Version]/config/PgeConfig_MoaMap.xml"/>
+ <property name="Runtime/Condition/GPolyFormattedQuery" value="[Runtime/Condition/IASI_GPolyFormattedQuery]"/>
+ <property name="Runtime/Condition/GPolyFilenamesQuery" value="[Runtime/Condition/IASI_GPolyFilenamesQuery]"/>
+ <property name="Runtime/Condition/SharedMetout" value="[SPSS_CONFIG_HOME]/[MoaIasiMap/SPSS_Version]/metout/AllProducts_metadata.xml,[SPSS_CONFIG_HOME]/[MoaIasiMap/SPSS_Version]/metout/DailyProduct_metadata.xml"/>
+ <property name="Instrument" value="IASI"/>
+ <property name="QueueName" value="idl"/>
+ </configuration>
+ </task>
+ <task id="urn:npp:MoaMhsMap" name="MoaMhsMap" class="org.apache.oodt.cas.pge.StdPGETaskInstance">
+ <conditions execution="sequential" type="pre">
+ <condition id-ref="urn:npp:MOA_MHS_POLY_FileBased"/>
+ </conditions>
+ <configuration>
+ <property name="PGETask/Name" value="MoaMhsMap"/>
+ <property name="PGETask/ConfigFilePath" value="[SPSS_CONFIG_HOME]/[MoaMhsMap/SPSS_Version]/config/PgeConfig_MoaMap.xml"/>
+ <property name="Runtime/Condition/GPolyFormattedQuery" value="[Runtime/Condition/MHS_GPolyFormattedQuery]"/>
+ <property name="Runtime/Condition/GPolyFilenamesQuery" value="[Runtime/Condition/MHS_GPolyFilenamesQuery]"/>
+ <property name="Runtime/Condition/SharedMetout" value="[SPSS_CONFIG_HOME]/[MoaMhsMap/SPSS_Version]/metout/AllProducts_metadata.xml,[SPSS_CONFIG_HOME]/[MoaMhsMap/SPSS_Version]/metout/DailyProduct_metadata.xml"/>
+ <property name="Instrument" value="MHS"/>
+ <property name="QueueName" value="idl"/>
+ </configuration>
+ </task>
+ <task id="urn:npp:MoaAmsuaMap" name="MoaAmsuaMap" class="org.apache.oodt.cas.pge.StdPGETaskInstance">
+ <conditions execution="sequential" type="pre">
+ <condition id-ref="urn:npp:MOA_AMSUA_POLY_FileBased"/>
+ </conditions>
+ <configuration>
+ <property name="PGETask/Name" value="MoaAmsuaMap"/>
+ <property name="PGETask/ConfigFilePath" value="[SPSS_CONFIG_HOME]/[MoaAmsuaMap/SPSS_Version]/config/PgeConfig_MoaMap.xml"/>
+ <property name="Runtime/Condition/GPolyFormattedQuery" value="[Runtime/Condition/AMSUA_GPolyFormattedQuery]"/>
+ <property name="Runtime/Condition/GPolyFilenamesQuery" value="[Runtime/Condition/AMSUA_GPolyFilenamesQuery]"/>
+ <property name="Runtime/Condition/SharedMetout" value="[SPSS_CONFIG_HOME]/[MoaAmsuaMap/SPSS_Version]/metout/AllProducts_metadata.xml,[SPSS_CONFIG_HOME]/[MoaAmsuaMap/SPSS_Version]/metout/DailyProduct_metadata.xml"/>
+ <property name="Instrument" value="AMSUA"/>
+ <property name="QueueName" value="idl"/>
+ </configuration>
+ </task>
+
+</cas:workflows>
Modified: oodt/trunk/workflow/src/main/resources/workflow.properties
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/resources/workflow.properties?rev=1142815&r1=1142814&r2=1142815&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/resources/workflow.properties (original)
+++ oodt/trunk/workflow/src/main/resources/workflow.properties Mon Jul 4 21:25:58 2011
@@ -56,6 +56,9 @@ org.apache.oodt.cas.workflow.instanceRep
# XML workflow repository properties
org.apache.oodt.cas.workflow.repo.dirs=file:///path/to/your/repo1, file:///path/to/your/repo2
+# wengine-style packaged workflow repo properties
+org.apache.oodt.cas.workflow.wengine.packagedRepo.dir.path = /path/to/wengine/workflow/files
+
# data source workflow repository properties
org.apache.oodt.cas.workflow.repo.datasource.jdbc.url=jdbc:url
org.apache.oodt.cas.workflow.repo.datasource.jdbc.user=user
Added: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/repository/TestPackagedWorkflowRepository.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/repository/TestPackagedWorkflowRepository.java?rev=1142815&view=auto
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/repository/TestPackagedWorkflowRepository.java (added)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/repository/TestPackagedWorkflowRepository.java Mon Jul 4 21:25:58 2011
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.repository;
+
+//JDK imports
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.oodt.cas.workflow.structs.Workflow;
+
+//Junit imports
+import junit.framework.TestCase;
+
+/**
+ *
+ * Test harness for the {@link PackagedWorkflowRepository}.
+ *
+ * @author mattmann
+ * @version $Revision$
+ *
+ */
+public class TestPackagedWorkflowRepository extends TestCase {
+
+ private PackagedWorkflowRepository repo;
+
+ public TestPackagedWorkflowRepository() {
+ }
+
+ public void testDetectOuterLevelWorkflows() {
+ assertNotNull(this.repo);
+ List<Workflow> workflows = null;
+ try {
+ workflows = this.repo.getWorkflows();
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+
+ boolean foundGranuleMaps = false;
+ for (Workflow w : workflows) {
+ if (w.getId().equals("urn:npp:GranuleMaps")) {
+ foundGranuleMaps = true;
+ }
+ }
+
+ assertTrue(foundGranuleMaps);
+ }
+
+ public void testDetectInnerWorkflows() {
+ assertNotNull(this.repo);
+ List<String> events = null;
+
+ try {
+ events = this.repo.getRegisteredEvents();
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+
+ boolean foundFour = false, foundThree = false;
+ for (String event : events) {
+ List<Workflow> workflows = null;
+
+ try {
+ workflows = this.repo.getWorkflowsForEvent(event);
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+
+ assertNotNull(workflows);
+ assertTrue(workflows.size() > 0);
+ if (workflows.size() == 3) {
+ foundThree = true;
+ } else if (workflows.size() == 4) {
+ foundFour = true;
+ }
+
+ }
+
+ assertTrue(foundThree);
+ assertTrue(foundFour);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see junit.framework.TestCase#setUp()
+ */
+ @Override
+ protected void setUp() throws Exception {
+ repo = new PackagedWorkflowRepository(Collections.singletonList(new File(
+ "src/main/resources/examples/wengine/GranuleMaps.xml")));
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see junit.framework.TestCase#tearDown()
+ */
+ @Override
+ protected void tearDown() throws Exception {
+ repo = null;
+ }
+
+}