You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by ma...@apache.org on 2012/02/21 22:14:47 UTC
svn commit: r1292033 -
/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/
Author: mattmann
Date: Tue Feb 21 21:14:47 2012
New Revision: 1292033
URL: http://svn.apache.org/viewvc?rev=1292033&view=rev
Log:
OODT-215/OODT-381: Create Runner framework to allow flexible WorkflowTask execution on different runtimes WIP
Added:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/EngineRunner.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ResourceRunner.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SynchronousLocalEngineRunner.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskProcessor.java
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ConditionProcessor.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialProcessor.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java
Added: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java?rev=1292033&view=auto
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java (added)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java Tue Feb 21 21:14:47 2012
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imports
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+//OODT imports
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+import org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance;
+import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
+
+/**
+ *
+ * Runs a local version of a {@link WorkflowTask} asynchronously.
+ *
+ * @author mattmann
+ * @version $Revision$
+ *
+ */
+public class AsynchronousLocalEngineRunner extends EngineRunner {
+
+ private static final Logger LOG = Logger
+ .getLogger(AsynchronousLocalEngineRunner.class.getName());
+
+ private ThreadPoolExecutor executor;
+
+ private Map<String, Thread> workerMap;
+
+ public AsynchronousLocalEngineRunner() {
+ this.executor = new ThreadPoolExecutor(0, 0, 0, TimeUnit.SECONDS, null,
+ new RejectedExecutionHandler() {
+
+ @Override
+ public void rejectedExecution(Runnable workflow,
+ ThreadPoolExecutor executor) {
+ // TODO Auto-generated method stub
+
+ }
+ });
+ this.workerMap = new HashMap<String, Thread>();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.EngineRunner#execute(org.apache.oodt
+ * .cas.workflow.structs.WorkflowTask, org.apache.oodt.cas.metadata.Metadata)
+ */
+ @Override
+ public void execute(final WorkflowTask workflowTask,
+ final Metadata dynMetadata) throws Exception {
+ Thread worker = new Thread() {
+
+ @Override
+ public void run() {
+ WorkflowTaskInstance inst = GenericWorkflowObjectFactory
+ .getTaskObjectFromClassName(workflowTask.getTaskInstanceClassName());
+ try {
+ inst.run(dynMetadata, workflowTask.getTaskConfig());
+ } catch (Exception e) {
+ LOG.log(Level.WARNING,
+ "Exception executing task: [" + workflowTask.getTaskName()
+ + "]: Message: " + e.getMessage());
+ e.printStackTrace();
+ }
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Thread#interrupt()
+ */
+ @SuppressWarnings("deprecation")
+ @Override
+ public void interrupt() {
+ super.interrupt();
+ this.destroy();
+ }
+
+ };
+
+ String id = null;
+ synchronized (id) {
+ id = UUID.randomUUID().toString();
+ this.workerMap.put(id, worker);
+ this.executor.execute(worker);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oodt.cas.workflow.engine.EngineRunner#shutdown()
+ */
+ @Override
+ public void shutdown() throws Exception {
+ for (Thread worker : this.workerMap.values()) {
+ if (worker != null) {
+ worker.interrupt();
+ worker = null;
+ }
+ }
+
+ }
+
+}
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ConditionProcessor.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ConditionProcessor.java?rev=1292033&r1=1292032&r2=1292033&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ConditionProcessor.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ConditionProcessor.java Tue Feb 21 21:14:47 2012
@@ -115,7 +115,7 @@ public class ConditionProcessor {
} else {
LOG.log(Level.INFO, "Condition: [" + condition.getConditionId()
+ "] is required: evaluation results: [" + result + "] included.");
- return false;
+ return result;
}
}
Added: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/EngineRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/EngineRunner.java?rev=1292033&view=auto
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/EngineRunner.java (added)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/EngineRunner.java Tue Feb 21 21:14:47 2012
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//OODT imports
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+
+/**
+ *
+ * Obfuscates the underlying substrate on which a {@link WorkflowTask} should
+ * run. In short, executes a {@link WorkflowTask} for the Workflow Engine.
+ *
+ * @author bfoster
+ * @author mattmann
+ * @version $Revision$
+ *
+ */
+public abstract class EngineRunner {
+
+ /**
+ * Executes a {@link WorkflowTask} on an execution substrate. Ideally there
+ * will only ever be two of these substrates, one for local execution, and
+ * another for communication with the Resource Manager.
+ *
+ * @param workflowTask
+ * The model of the {@link WorkflowTask} to instantiate and execute.
+ * @param dynMetadata
+ * The dynamic {@link Metadata} passed to this {@link WorkflowTask}.
+ *
+ * @throws Exception
+ * If any error occurs.
+ */
+ public abstract void execute(WorkflowTask workflowTask, Metadata dynMetadata)
+ throws Exception;
+
+ /**
+ * Shuts this runner down and frees its resources.
+ *
+ * @throws Exception
+ * If any error occurs while freeing resources.
+ *
+ */
+ public abstract void shutdown() throws Exception;
+
+}
Added: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ResourceRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ResourceRunner.java?rev=1292033&view=auto
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ResourceRunner.java (added)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ResourceRunner.java Tue Feb 21 21:14:47 2012
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imports
+import java.net.URL;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+//OODT imports
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.resource.structs.Job;
+import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
+import org.apache.oodt.cas.resource.system.XmlRpcResourceManagerClient;
+import org.apache.oodt.cas.workflow.metadata.CoreMetKeys;
+import org.apache.oodt.cas.workflow.structs.TaskJobInput;
+import org.apache.oodt.cas.workflow.structs.WorkflowStatus;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+
+/**
+ *
+ * Submits a {@link WorkflowTask} to the Resource Manager.
+ *
+ * @author mattmann
+ * @version $Revision$
+ *
+ */
+public class ResourceRunner extends EngineRunner implements CoreMetKeys,
+ WorkflowStatus {
+
+ private static final Logger LOG = Logger.getLogger(ResourceRunner.class
+ .getName());
+
+ protected static final String DEFAULT_QUEUE_NAME = "high";
+
+ protected XmlRpcResourceManagerClient rClient;
+
+ private String currentJobId;
+
+ public ResourceRunner(URL resUrl) {
+ this.rClient = new XmlRpcResourceManagerClient(resUrl);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.EngineRunner#execute(org.apache.oodt
+ * .cas.workflow.structs.WorkflowTask, org.apache.oodt.cas.metadata.Metadata)
+ */
+ @Override
+ public void execute(WorkflowTask workflowTask, Metadata dynMetadata)
+ throws Exception {
+ Job workflowTaskJob = new Job();
+ workflowTaskJob.setName(workflowTask.getTaskId());
+ workflowTaskJob
+ .setJobInstanceClassName("org.apache.oodt.cas.workflow.structs.TaskJob");
+ workflowTaskJob
+ .setJobInputClassName("org.apache.oodt.cas.workflow.structs.TaskJobInput");
+ workflowTaskJob.setLoadValue(new Integer(2));
+ workflowTaskJob.setQueueName(workflowTask.getTaskConfig().getProperty(
+ QUEUE_NAME) != null ? workflowTask.getTaskConfig().getProperty(
+ QUEUE_NAME) : DEFAULT_QUEUE_NAME);
+
+ if (workflowTask.getTaskConfig().getProperty(TASK_LOAD) != null) {
+ workflowTaskJob.setLoadValue(Integer.valueOf(workflowTask.getTaskConfig()
+ .getProperty(TASK_LOAD)));
+ }
+
+ TaskJobInput in = new TaskJobInput();
+ in.setDynMetadata(dynMetadata);
+ in.setTaskConfig(workflowTask.getTaskConfig());
+ in.setWorkflowTaskInstanceClassName(workflowTask.getTaskInstanceClassName());
+
+ try {
+ this.currentJobId = rClient.submitJob(workflowTaskJob, in);
+ } catch (JobExecutionException e) {
+ LOG.log(Level.WARNING,
+ "Job execution exception using resource manager to execute job: Message: "
+ + e.getMessage());
+ }
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oodt.cas.workflow.engine.EngineRunner#shutdown()
+ */
+ @Override
+ public void shutdown() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ protected boolean safeCheckJobComplete(String jobId) {
+ try {
+ return rClient.isJobComplete(jobId);
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Exception checking completion status for job: ["
+ + jobId + "]: Messsage: " + e.getMessage());
+ return false;
+ }
+ }
+
+ protected boolean stopJob(String jobId) {
+ if (this.rClient != null && this.currentJobId != null) {
+ if (!this.rClient.killJob(this.currentJobId)) {
+ LOG.log(Level.WARNING, "Attempt to kill " + "current resmgr job: ["
+ + this.currentJobId + "]: failed");
+ return false;
+ } else
+ return true;
+ } else
+ return false;
+ }
+
+}
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialProcessor.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialProcessor.java?rev=1292033&r1=1292032&r2=1292033&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialProcessor.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialProcessor.java Tue Feb 21 21:14:47 2012
@@ -35,6 +35,7 @@ import org.apache.oodt.commons.util.Date
import java.net.URL;
import java.util.Date;
import java.util.Iterator;
+import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -70,171 +71,10 @@ public class SequentialProcessor extends
this.conditionEvaluator = new ConditionProcessor();
}
- /* (non-Javadoc)
- * @see org.apache.oodt.cas.workflow.engine.WorkflowProcessor#start()
- */
- @Override
- public void start() {
-
- String startDateTimeIsoStr = DateConvert.isoFormat(new Date());
- this.workflowInstance.setStartDateTimeIsoStr(startDateTimeIsoStr);
- this.persistWorkflowInstance();
-
- while (running && taskIterator.hasNext()) {
- if (isPaused()) {
- LOG.log(Level.FINE,
- "SequentialProcessor: Skipping execution: Paused: CurrentTask: "
- + getTaskNameById(this.workflowInstance.getCurrentTaskId()));
- continue;
- }
-
- WorkflowTask task = (WorkflowTask) taskIterator.next();
- this.workflowInstance.setCurrentTaskId(task.getTaskId());
-
- this.persistWorkflowInstance();
- if (!checkTaskRequiredMetadata(task,
- this.workflowInstance.getSharedContext())) {
- this.workflowInstance.setStatus(METADATA_MISSING);
- this.persistWorkflowInstance();
- return;
- }
-
- if (task.getConditions() != null) {
- while(!this.conditionEvaluator.satisfied(task.getConditions(), task.getTaskId(),
- this.workflowInstance.getSharedContext()) && isRunning()) {
-
- if (!isPaused()) {
- pause();
- }
-
- LOG.log(Level.FINEST,
- "Pre-conditions for task: " + task.getTaskName()
- + " unsatisfied: waiting: " + waitForConditionSatisfy
- + " seconds before checking again.");
- try {
- Thread.currentThread().sleep(waitForConditionSatisfy * 1000);
- } catch (InterruptedException ignore) {
- }
-
- if (!isPaused()) {
- break;
- }
- }
-
- if (!isRunning()) {
- break;
- }
-
- if (isPaused()) {
- resume();
- }
- }
- LOG.log(
- Level.FINEST,
- "IterativeWorkflowProcessorThread: Executing task: "
- + task.getTaskName());
-
- WorkflowTaskInstance taskInstance = GenericWorkflowObjectFactory
- .getTaskObjectFromClassName(task.getTaskInstanceClassName());
- this.workflowInstance.getSharedContext().replaceMetadata(TASK_ID,
- task.getTaskId());
- this.workflowInstance.getSharedContext().replaceMetadata(
- WORKFLOW_INST_ID, this.workflowInstance.getId());
- this.workflowInstance.getSharedContext().replaceMetadata(JOB_ID,
- this.workflowInstance.getId());
- this.workflowInstance.getSharedContext().replaceMetadata(PROCESSING_NODE,
- getHostname());
- this.workflowInstance.getSharedContext().replaceMetadata(
- WORKFLOW_MANAGER_URL, this.wmgrParentUrl.toString());
-
- if (rClient != null) {
- Job taskJob = new Job();
- taskJob.setName(task.getTaskId());
- taskJob
- .setJobInstanceClassName("org.apache.oodt.cas.workflow.structs.TaskJob");
- taskJob
- .setJobInputClassName("org.apache.oodt.cas.workflow.structs.TaskJobInput");
- taskJob.setLoadValue(new Integer(2));
- taskJob
- .setQueueName(task.getTaskConfig().getProperty(QUEUE_NAME) != null ? task
- .getTaskConfig().getProperty(QUEUE_NAME) : DEFAULT_QUEUE_NAME);
-
- TaskJobInput in = new TaskJobInput();
- in.setDynMetadata(this.workflowInstance.getSharedContext());
- in.setTaskConfig(task.getTaskConfig());
- in.setWorkflowTaskInstanceClassName(task.getTaskInstanceClassName());
-
- this.workflowInstance.setStatus(RESMGR_SUBMIT);
- this.persistWorkflowInstance();
-
- try {
- this.currentJobId = rClient.submitJob(taskJob, in);
- while (!safeCheckJobComplete(this.currentJobId) && isRunning()) {
- try {
- Thread.currentThread().sleep(pollingWaitTime * 1000);
- } catch (InterruptedException ignore) {
- }
- }
-
- if (!isRunning()) {
- break;
- }
-
- WorkflowInstance updatedInst = null;
- try {
- updatedInst = instanceRepository
- .getWorkflowInstanceById(this.workflowInstance.getId());
- this.workflowInstance = updatedInst;
- } catch (InstanceRepositoryException e) {
- e.printStackTrace();
- LOG.log(Level.WARNING, "Unable to get " + "updated workflow "
- + "instance record " + "when executing remote job: Message: "
- + e.getMessage());
- }
-
- } catch (JobExecutionException e) {
- LOG.log(Level.WARNING,
- "Job execution exception using resource manager to execute job: Message: "
- + e.getMessage());
- }
- } else {
- this.workflowInstance.setStatus(STARTED);
- String currentTaskIsoStartDateTimeStr = DateConvert
- .isoFormat(new Date());
- this.workflowInstance
- .setCurrentTaskStartDateTimeIsoStr(currentTaskIsoStartDateTimeStr);
- this.workflowInstance.setCurrentTaskEndDateTimeIsoStr(null);
- this.persistWorkflowInstance();
- this.executeTaskLocally(taskInstance,
- this.workflowInstance.getSharedContext(), task.getTaskConfig(),
- task.getTaskName());
- String currentTaskIsoEndDateTimeStr = DateConvert.isoFormat(new Date());
- this.workflowInstance
- .setCurrentTaskEndDateTimeIsoStr(currentTaskIsoEndDateTimeStr);
- this.persistWorkflowInstance();
- }
-
- LOG.log(Level.FINEST, "SequentialWorkflowProcessor: Completed task: "
- + task.getTaskName());
-
- }
-
- LOG.log(Level.FINEST, "SequentialWorkflowProcessor: Completed workflow: "
- + this.workflowInstance.getWorkflow().getName());
- if (isRunning()) {
- stop();
- }
-
- }
public synchronized void stop() {
running = false;
- if (this.rClient != null && this.currentJobId != null) {
- if (!this.rClient.killJob(this.currentJobId)) {
- LOG.log(Level.WARNING, "Attempt to kill " + "current resmgr job: ["
- + this.currentJobId + "]: failed");
- }
- }
+ //something with resource manager client here
this.workflowInstance.setStatus(FINISHED);
String isoEndDateTimeStr = DateConvert.isoFormat(new Date());
@@ -243,15 +83,25 @@ public class SequentialProcessor extends
}
public synchronized void resume() {
- this.paused = false;
+ //this.paused = false;
this.workflowInstance.setStatus(STARTED);
this.persistWorkflowInstance();
}
public synchronized void pause() {
- this.paused = true;
+ //this.paused = true;
this.workflowInstance.setStatus(PAUSED);
this.persistWorkflowInstance();
}
+
+ /* (non-Javadoc)
+ * @see org.apache.oodt.cas.workflow.engine.WorkflowProcessor#getRunnableSubProcessors()
+ */
+ @Override
+ protected List<WorkflowProcessor> getRunnableSubProcessors() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
Added: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SynchronousLocalEngineRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SynchronousLocalEngineRunner.java?rev=1292033&view=auto
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SynchronousLocalEngineRunner.java (added)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SynchronousLocalEngineRunner.java Tue Feb 21 21:14:47 2012
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imports
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+//OODT Imports
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+import org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance;
+import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
+
+/**
+ *
+ * Executes a {@link WorkflowTask} locally on the WM's machine, using
+ * synchronous blocking before running the next task.
+ *
+ * @author mattmann
+ * @version $Revision$
+ *
+ */
+public class SynchronousLocalEngineRunner extends EngineRunner {
+
+ private static final Logger LOG = Logger
+ .getLogger(SynchronousLocalEngineRunner.class.getName());
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.EngineRunner#execute(org.apache.oodt
+ * .cas.workflow.structs.WorkflowTask, org.apache.oodt.cas.metadata.Metadata)
+ */
+ @Override
+ public void execute(WorkflowTask workflowTask, Metadata dynMetadata)
+ throws Exception {
+ WorkflowTaskInstance inst = GenericWorkflowObjectFactory
+ .getTaskObjectFromClassName(workflowTask.getTaskInstanceClassName());
+ try {
+ inst.run(dynMetadata, workflowTask.getTaskConfig());
+ } catch (Exception e) {
+ LOG.log(Level.WARNING,
+ "Exception executing task: [" + workflowTask.getTaskName()
+ + "]: Message: " + e.getMessage());
+ e.printStackTrace();
+ }
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oodt.cas.workflow.engine.EngineRunner#shutdown()
+ */
+ @Override
+ public void shutdown() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+}
Added: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskProcessor.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskProcessor.java?rev=1292033&view=auto
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskProcessor.java (added)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskProcessor.java Tue Feb 21 21:14:47 2012
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.oodt.cas.workflow.engine;
+
+import java.net.URL;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
+import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+
+/**
+ *
+ * Describe your class here.
+ *
+ * @author mattmann
+ * @version $Revision$
+ *
+ */
+public class TaskProcessor extends WorkflowProcessor {
+
+ private WorkflowTask task;
+
+ private static final Logger LOG = Logger.getLogger(TaskProcessor.class.getName());
+
+ /**
+ * @param workflowInstance
+ * @param instRep
+ * @param wParentUrl
+ * @param conditionWait
+ */
+ public TaskProcessor(WorkflowInstance workflowInstance,
+ WorkflowInstanceRepository instRep, URL wParentUrl, long conditionWait) {
+ super(workflowInstance, instRep, wParentUrl, conditionWait);
+ // TODO Auto-generated constructor stub
+ }
+
+ public WorkflowTask getTask(){
+ return this.task;
+ }
+
+ public boolean checkTaskRequiredMetadata(
+ Metadata dynMetadata) {
+ if (task.getRequiredMetFields() == null
+ || (task.getRequiredMetFields() != null && task.getRequiredMetFields()
+ .size() == 0)) {
+ LOG.log(Level.INFO, "Task: [" + task.getTaskName()
+ + "] has no required metadata fields");
+ return true; /* no required metadata, so we're fine */
+ }
+
+ for (String reqField : (List<String>) (List<?>) task.getRequiredMetFields()) {
+ if (!dynMetadata.containsKey(reqField)) {
+ LOG.log(Level.SEVERE, "Checking metadata key: [" + reqField
+ + "] for task: [" + task.getTaskName()
+ + "]: failed: aborting workflow");
+ return false;
+ }
+ }
+
+ LOG.log(Level.INFO, "All required metadata fields present for task: ["
+ + task.getTaskName() + "]");
+
+ return true;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oodt.cas.workflow.engine.WorkflowProcessor#getRunnableSubProcessors()
+ */
+ @Override
+ protected List<WorkflowProcessor> getRunnableSubProcessors() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+
+
+}
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java?rev=1292033&r1=1292032&r2=1292033&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java Tue Feb 21 21:14:47 2012
@@ -18,16 +18,13 @@
package org.apache.oodt.cas.workflow.engine;
//JDK imports
-import java.net.InetAddress;
import java.net.URL;
-import java.net.UnknownHostException;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
//OODT imports
import org.apache.oodt.cas.metadata.Metadata;
-import org.apache.oodt.cas.resource.system.XmlRpcResourceManagerClient;
import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
import org.apache.oodt.cas.workflow.structs.WorkflowTask;
@@ -53,22 +50,16 @@ public abstract class WorkflowProcessor
protected WorkflowInstanceRepository instanceRepository = null;
- protected XmlRpcResourceManagerClient rClient = null;
-
protected long pollingWaitTime = 10L;
protected boolean running = false;
protected int timesPaused;
- protected static final String DEFAULT_QUEUE_NAME = "high";
-
protected URL wmgrParentUrl = null;
protected String currentJobId = null;
- protected boolean paused = false;
-
public WorkflowProcessor(WorkflowInstance workflowInstance,
WorkflowInstanceRepository instRep, URL wParentUrl, long conditionWait) {
this.workflowInstance = workflowInstance;
@@ -77,7 +68,6 @@ public abstract class WorkflowProcessor
this.waitForConditionSatisfy = conditionWait;
this.pollingWaitTime = conditionWait;
this.wmgrParentUrl = wParentUrl;
-
}
/**
@@ -96,40 +86,6 @@ public abstract class WorkflowProcessor
}
/**
- * <p>
- * Stops once and for all the thread from processing the workflow. This method
- * should not maintain the state of the workflow, it should gracefully shut
- * down the WorkflowProcessor and any of its subsequent resources.
- * </p>
- *
- */
- public abstract void stop();
-
- /**
- * <p>
- * Resumes execution of a {@link #pause}d {@link WorkflowInstace} by this
- * WorkflowProcessor.
- * </p>
- *
- */
- public abstract void resume();
-
- /**
- * <p>
- * Pauses exectuion of a {@link WorkflowInstace} being handled by this
- * WorkflowProcessor.
- * </p>
- *
- */
- public abstract void pause();
-
-
- /**
- * Starts execution of the subordinate {@link WorkflowProcessor}.
- */
- public abstract void start();
-
- /**
* Returns the identifier of the current {@link WorkflowTask} being processed
* by this WorkflowProcessor.
*
@@ -154,41 +110,29 @@ public abstract class WorkflowProcessor
public void setRunning(boolean running) {
this.running = running;
}
-
- /**
- * @return the paused
- */
- public boolean isPaused() {
- return paused;
- }
-
- /**
- * @param paused
- * the paused to set
- */
- public void setPaused(boolean paused) {
- this.paused = paused;
- }
-
- /**
- * @return the rClient
- */
- public XmlRpcResourceManagerClient getrClient() {
- return rClient;
+
+ @Override
+ public String toString(){
+ StringBuilder builder = new StringBuilder();
+ builder.append("processor:[type=");
+ builder.append(getClass().toString());
+ builder.append(",startdate=");
+ builder.append(getWorkflowInstance().getStartDate());
+ builder.append(",priority=");
+ builder.append(getWorkflowInstance().getPriority());
+ builder.append("]");
+ return builder.toString();
}
/**
- * @param client
- * the rClient to set
+ * Gets the runnable consituent elements of this {@link WorkflowProcessor}
+ * which may include {@link WorkflowProcessor}s themselves.
+ *
+ * @return The current runnable set of {@link WorkflowProcessor}s that this
+ * {@link WorkflowProcessor} is modeling.
*/
- public void setRClient(XmlRpcResourceManagerClient client) {
- rClient = client;
- if (rClient != null) {
- LOG.log(Level.INFO, "Resource Manager Job Submission enabled to: ["
- + rClient.getResMgrUrl() + "]");
- }
- }
-
+ protected abstract List<WorkflowProcessor> getRunnableSubProcessors();
+
protected void persistWorkflowInstance() {
try {
instanceRepository.updateWorkflowInstance(this.workflowInstance);
@@ -210,41 +154,6 @@ public abstract class WorkflowProcessor
}
}
- protected boolean safeCheckJobComplete(String jobId) {
- try {
- return rClient.isJobComplete(jobId);
- } catch (Exception e) {
- LOG.log(Level.WARNING, "Exception checking completion status for job: ["
- + jobId + "]: Messsage: " + e.getMessage());
- return false;
- }
- }
-
- protected boolean checkTaskRequiredMetadata(WorkflowTask task,
- Metadata dynMetadata) {
- if (task.getRequiredMetFields() == null
- || (task.getRequiredMetFields() != null && task.getRequiredMetFields()
- .size() == 0)) {
- LOG.log(Level.INFO, "Task: [" + task.getTaskName()
- + "] has no required metadata fields");
- return true; /* no required metadata, so we're fine */
- }
-
- for (String reqField : (List<String>) (List<?>) task.getRequiredMetFields()) {
- if (!dynMetadata.containsKey(reqField)) {
- LOG.log(Level.SEVERE, "Checking metadata key: [" + reqField
- + "] for task: [" + task.getTaskName()
- + "]: failed: aborting workflow");
- return false;
- }
- }
-
- LOG.log(Level.INFO, "All required metadata fields present for task: ["
- + task.getTaskName() + "]");
-
- return true;
- }
-
protected String getTaskNameById(String taskId) {
for (WorkflowTask task : (List<WorkflowTask>) (List<?>) this.workflowInstance
.getWorkflow().getTasks()) {
@@ -255,17 +164,5 @@ public abstract class WorkflowProcessor
return null;
}
-
- protected String getHostname() {
- try {
- // Get hostname by textual representation of IP address
- InetAddress addr = InetAddress.getLocalHost();
- // Get the host name
- String hostname = addr.getHostName();
- return hostname;
- } catch (UnknownHostException e) {
- }
- return null;
- }
-
+
}