You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by ma...@apache.org on 2012/02/21 22:14:47 UTC

svn commit: r1292033 - /oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/

Author: mattmann
Date: Tue Feb 21 21:14:47 2012
New Revision: 1292033

URL: http://svn.apache.org/viewvc?rev=1292033&view=rev
Log:
OODT-215/OODT-381: Create Runner framework to allow flexible WorkflowTask execution on different runtimes WIP

Added:
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/EngineRunner.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ResourceRunner.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SynchronousLocalEngineRunner.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskProcessor.java
Modified:
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ConditionProcessor.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialProcessor.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java

Added: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java?rev=1292033&view=auto
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java (added)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java Tue Feb 21 21:14:47 2012
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imports
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+//OODT imports
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+import org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance;
+import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
+
+/**
+ * 
+ * Runs a local version of a {@link WorkflowTask} asynchronously.
+ * 
+ * @author mattmann
+ * @version $Revision$
+ * 
+ */
+public class AsynchronousLocalEngineRunner extends EngineRunner {
+
+  private static final Logger LOG = Logger
+      .getLogger(AsynchronousLocalEngineRunner.class.getName());
+
+  private ThreadPoolExecutor executor;
+
+  private Map<String, Thread> workerMap;
+
+  public AsynchronousLocalEngineRunner() {
+    this.executor = new ThreadPoolExecutor(0, 0, 0, TimeUnit.SECONDS, null,
+        new RejectedExecutionHandler() {
+
+          @Override
+          public void rejectedExecution(Runnable workflow,
+              ThreadPoolExecutor executor) {
+            // TODO Auto-generated method stub
+
+          }
+        });
+    this.workerMap = new HashMap<String, Thread>();
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.EngineRunner#execute(org.apache.oodt
+   * .cas.workflow.structs.WorkflowTask, org.apache.oodt.cas.metadata.Metadata)
+   */
+  @Override
+  public void execute(final WorkflowTask workflowTask,
+      final Metadata dynMetadata) throws Exception {
+    Thread worker = new Thread() {
+
+      @Override
+      public void run() {
+        WorkflowTaskInstance inst = GenericWorkflowObjectFactory
+            .getTaskObjectFromClassName(workflowTask.getTaskInstanceClassName());
+        try {
+          inst.run(dynMetadata, workflowTask.getTaskConfig());
+        } catch (Exception e) {
+          LOG.log(Level.WARNING,
+              "Exception executing task: [" + workflowTask.getTaskName()
+                  + "]: Message: " + e.getMessage());
+          e.printStackTrace();
+        }
+
+      }
+
+      /*
+       * (non-Javadoc)
+       * 
+       * @see java.lang.Thread#interrupt()
+       */
+      @SuppressWarnings("deprecation")
+      @Override
+      public void interrupt() {
+        super.interrupt();
+        this.destroy();
+      }
+
+    };
+
+    String id = null;
+    synchronized (id) {
+      id = UUID.randomUUID().toString();
+      this.workerMap.put(id, worker);
+      this.executor.execute(worker);
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.oodt.cas.workflow.engine.EngineRunner#shutdown()
+   */
+  @Override
+  public void shutdown() throws Exception {
+    for (Thread worker : this.workerMap.values()) {
+      if (worker != null) {
+        worker.interrupt();
+        worker = null;
+      }
+    }
+
+  }
+
+}

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ConditionProcessor.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ConditionProcessor.java?rev=1292033&r1=1292032&r2=1292033&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ConditionProcessor.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ConditionProcessor.java Tue Feb 21 21:14:47 2012
@@ -115,7 +115,7 @@ public class ConditionProcessor {
     } else {
       LOG.log(Level.INFO, "Condition: [" + condition.getConditionId()
           + "] is required: evaluation results: [" + result + "] included.");
-      return false;
+      return result;
     }
   }
 

Added: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/EngineRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/EngineRunner.java?rev=1292033&view=auto
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/EngineRunner.java (added)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/EngineRunner.java Tue Feb 21 21:14:47 2012
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//OODT imports
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+
+/**
+ * 
+ * Obfuscates the underlying substrate on which a {@link WorkflowTask} should
+ * run. In short, executes a {@link WorkflowTask} for the Workflow Engine.
+ * 
+ * @author bfoster
+ * @author mattmann
+ * @version $Revision$
+ * 
+ */
+public abstract class EngineRunner {
+
+  /**
+   * Executes a {@link WorkflowTask} on an execution substrate. Ideally there
+   * will only ever be two of these substrates, one for local execution, and
+   * another for communication with the Resource Manager.
+   * 
+   * @param workflowTask
+   *          The model of the {@link WorkflowTask} to instantiate and execute.
+   * @param dynMetadata
+   *          The dynamic {@link Metadata} passed to this {@link WorkflowTask}.
+   * 
+   * @throws Exception
+   *           If any error occurs.
+   */
+  public abstract void execute(WorkflowTask workflowTask, Metadata dynMetadata)
+      throws Exception;
+
+  /**
+   * Shuts this runner down and frees its resources.
+   * 
+   * @throws Exception
+   *           If any error occurs while freeing resources.
+   * 
+   */
+  public abstract void shutdown() throws Exception;
+
+}

Added: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ResourceRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ResourceRunner.java?rev=1292033&view=auto
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ResourceRunner.java (added)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ResourceRunner.java Tue Feb 21 21:14:47 2012
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imports
+import java.net.URL;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+//OODT imports
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.resource.structs.Job;
+import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
+import org.apache.oodt.cas.resource.system.XmlRpcResourceManagerClient;
+import org.apache.oodt.cas.workflow.metadata.CoreMetKeys;
+import org.apache.oodt.cas.workflow.structs.TaskJobInput;
+import org.apache.oodt.cas.workflow.structs.WorkflowStatus;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+
+/**
+ * 
+ * Submits a {@link WorkflowTask} to the Resource Manager.
+ * 
+ * @author mattmann
+ * @version $Revision$
+ * 
+ */
+public class ResourceRunner extends EngineRunner implements CoreMetKeys,
+    WorkflowStatus {
+
+  private static final Logger LOG = Logger.getLogger(ResourceRunner.class
+      .getName());
+
+  protected static final String DEFAULT_QUEUE_NAME = "high";
+
+  protected XmlRpcResourceManagerClient rClient;
+
+  private String currentJobId;
+
+  public ResourceRunner(URL resUrl) {
+    this.rClient = new XmlRpcResourceManagerClient(resUrl);
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.EngineRunner#execute(org.apache.oodt
+   * .cas.workflow.structs.WorkflowTask, org.apache.oodt.cas.metadata.Metadata)
+   */
+  @Override
+  public void execute(WorkflowTask workflowTask, Metadata dynMetadata)
+      throws Exception {
+    Job workflowTaskJob = new Job();
+    workflowTaskJob.setName(workflowTask.getTaskId());
+    workflowTaskJob
+        .setJobInstanceClassName("org.apache.oodt.cas.workflow.structs.TaskJob");
+    workflowTaskJob
+        .setJobInputClassName("org.apache.oodt.cas.workflow.structs.TaskJobInput");
+    workflowTaskJob.setLoadValue(new Integer(2));
+    workflowTaskJob.setQueueName(workflowTask.getTaskConfig().getProperty(
+        QUEUE_NAME) != null ? workflowTask.getTaskConfig().getProperty(
+        QUEUE_NAME) : DEFAULT_QUEUE_NAME);
+
+    if (workflowTask.getTaskConfig().getProperty(TASK_LOAD) != null) {
+      workflowTaskJob.setLoadValue(Integer.valueOf(workflowTask.getTaskConfig()
+          .getProperty(TASK_LOAD)));
+    }
+
+    TaskJobInput in = new TaskJobInput();
+    in.setDynMetadata(dynMetadata);
+    in.setTaskConfig(workflowTask.getTaskConfig());
+    in.setWorkflowTaskInstanceClassName(workflowTask.getTaskInstanceClassName());
+
+    try {
+      this.currentJobId = rClient.submitJob(workflowTaskJob, in);
+    } catch (JobExecutionException e) {
+      LOG.log(Level.WARNING,
+          "Job execution exception using resource manager to execute job: Message: "
+              + e.getMessage());
+    }
+
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.oodt.cas.workflow.engine.EngineRunner#shutdown()
+   */
+  @Override
+  public void shutdown() throws Exception {
+    // TODO Auto-generated method stub
+
+  }
+
+  protected boolean safeCheckJobComplete(String jobId) {
+    try {
+      return rClient.isJobComplete(jobId);
+    } catch (Exception e) {
+      LOG.log(Level.WARNING, "Exception checking completion status for job: ["
+          + jobId + "]: Messsage: " + e.getMessage());
+      return false;
+    }
+  }
+
+  protected boolean stopJob(String jobId) {
+    if (this.rClient != null && this.currentJobId != null) {
+      if (!this.rClient.killJob(this.currentJobId)) {
+        LOG.log(Level.WARNING, "Attempt to kill " + "current resmgr job: ["
+            + this.currentJobId + "]: failed");
+        return false;
+      } else
+        return true;
+    } else
+      return false;
+  }
+
+}

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialProcessor.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialProcessor.java?rev=1292033&r1=1292032&r2=1292033&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialProcessor.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialProcessor.java Tue Feb 21 21:14:47 2012
@@ -35,6 +35,7 @@ import org.apache.oodt.commons.util.Date
 import java.net.URL;
 import java.util.Date;
 import java.util.Iterator;
+import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -70,171 +71,10 @@ public class SequentialProcessor extends
     this.conditionEvaluator = new ConditionProcessor();
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.oodt.cas.workflow.engine.WorkflowProcessor#start()
-   */
-  @Override
-  public void start() {
-
-    String startDateTimeIsoStr = DateConvert.isoFormat(new Date());
-    this.workflowInstance.setStartDateTimeIsoStr(startDateTimeIsoStr);
-    this.persistWorkflowInstance();
-
-    while (running && taskIterator.hasNext()) {
-      if (isPaused()) {
-        LOG.log(Level.FINE,
-            "SequentialProcessor: Skipping execution: Paused: CurrentTask: "
-                + getTaskNameById(this.workflowInstance.getCurrentTaskId()));
-        continue;
-      }
-
-      WorkflowTask task = (WorkflowTask) taskIterator.next();
-      this.workflowInstance.setCurrentTaskId(task.getTaskId());
-
-      this.persistWorkflowInstance();
-      if (!checkTaskRequiredMetadata(task,
-          this.workflowInstance.getSharedContext())) {
-        this.workflowInstance.setStatus(METADATA_MISSING);
-        this.persistWorkflowInstance();
-        return;
-      }
-
-      if (task.getConditions() != null) {
-        while(!this.conditionEvaluator.satisfied(task.getConditions(), task.getTaskId(),
-            this.workflowInstance.getSharedContext()) && isRunning()) {
-
-          if (!isPaused()) {
-            pause();
-          }
-
-          LOG.log(Level.FINEST,
-              "Pre-conditions for task: " + task.getTaskName()
-                  + " unsatisfied: waiting: " + waitForConditionSatisfy
-                  + " seconds before checking again.");
-          try {
-            Thread.currentThread().sleep(waitForConditionSatisfy * 1000);
-          } catch (InterruptedException ignore) {
-          }
-
-          if (!isPaused()) {
-            break;
-          }
-        }
-
-        if (!isRunning()) {
-          break;
-        }
-
-        if (isPaused()) {
-          resume();
-        }
-      }
-      LOG.log(
-          Level.FINEST,
-          "IterativeWorkflowProcessorThread: Executing task: "
-              + task.getTaskName());
-
-      WorkflowTaskInstance taskInstance = GenericWorkflowObjectFactory
-          .getTaskObjectFromClassName(task.getTaskInstanceClassName());
-      this.workflowInstance.getSharedContext().replaceMetadata(TASK_ID,
-          task.getTaskId());
-      this.workflowInstance.getSharedContext().replaceMetadata(
-          WORKFLOW_INST_ID, this.workflowInstance.getId());
-      this.workflowInstance.getSharedContext().replaceMetadata(JOB_ID,
-          this.workflowInstance.getId());
-      this.workflowInstance.getSharedContext().replaceMetadata(PROCESSING_NODE,
-          getHostname());
-      this.workflowInstance.getSharedContext().replaceMetadata(
-          WORKFLOW_MANAGER_URL, this.wmgrParentUrl.toString());
-
-      if (rClient != null) {
-        Job taskJob = new Job();
-        taskJob.setName(task.getTaskId());
-        taskJob
-            .setJobInstanceClassName("org.apache.oodt.cas.workflow.structs.TaskJob");
-        taskJob
-            .setJobInputClassName("org.apache.oodt.cas.workflow.structs.TaskJobInput");
-        taskJob.setLoadValue(new Integer(2));
-        taskJob
-            .setQueueName(task.getTaskConfig().getProperty(QUEUE_NAME) != null ? task
-                .getTaskConfig().getProperty(QUEUE_NAME) : DEFAULT_QUEUE_NAME);
-
-        TaskJobInput in = new TaskJobInput();
-        in.setDynMetadata(this.workflowInstance.getSharedContext());
-        in.setTaskConfig(task.getTaskConfig());
-        in.setWorkflowTaskInstanceClassName(task.getTaskInstanceClassName());
-
-        this.workflowInstance.setStatus(RESMGR_SUBMIT);
-        this.persistWorkflowInstance();
-
-        try {
-          this.currentJobId = rClient.submitJob(taskJob, in);
-          while (!safeCheckJobComplete(this.currentJobId) && isRunning()) {
-            try {
-              Thread.currentThread().sleep(pollingWaitTime * 1000);
-            } catch (InterruptedException ignore) {
-            }
-          }
-
-          if (!isRunning()) {
-            break;
-          }
-
-          WorkflowInstance updatedInst = null;
-          try {
-            updatedInst = instanceRepository
-                .getWorkflowInstanceById(this.workflowInstance.getId());
-            this.workflowInstance = updatedInst;
-          } catch (InstanceRepositoryException e) {
-            e.printStackTrace();
-            LOG.log(Level.WARNING, "Unable to get " + "updated workflow "
-                + "instance record " + "when executing remote job: Message: "
-                + e.getMessage());
-          }
-
-        } catch (JobExecutionException e) {
-          LOG.log(Level.WARNING,
-              "Job execution exception using resource manager to execute job: Message: "
-                  + e.getMessage());
-        }
-      } else {
-        this.workflowInstance.setStatus(STARTED);
-        String currentTaskIsoStartDateTimeStr = DateConvert
-            .isoFormat(new Date());
-        this.workflowInstance
-            .setCurrentTaskStartDateTimeIsoStr(currentTaskIsoStartDateTimeStr);
-        this.workflowInstance.setCurrentTaskEndDateTimeIsoStr(null); 
-        this.persistWorkflowInstance();
-        this.executeTaskLocally(taskInstance,
-            this.workflowInstance.getSharedContext(), task.getTaskConfig(),
-            task.getTaskName());
-        String currentTaskIsoEndDateTimeStr = DateConvert.isoFormat(new Date());
-        this.workflowInstance
-            .setCurrentTaskEndDateTimeIsoStr(currentTaskIsoEndDateTimeStr);
-        this.persistWorkflowInstance();
-      }
-
-      LOG.log(Level.FINEST, "SequentialWorkflowProcessor: Completed task: "
-          + task.getTaskName());
-
-    }
-
-    LOG.log(Level.FINEST, "SequentialWorkflowProcessor: Completed workflow: "
-        + this.workflowInstance.getWorkflow().getName());
-    if (isRunning()) {
-      stop();
-    }
-
-  }
 
   public synchronized void stop() {
     running = false;
-    if (this.rClient != null && this.currentJobId != null) {
-      if (!this.rClient.killJob(this.currentJobId)) {
-        LOG.log(Level.WARNING, "Attempt to kill " + "current resmgr job: ["
-            + this.currentJobId + "]: failed");
-      }
-    }
+    //something with resource manager client here
 
     this.workflowInstance.setStatus(FINISHED);
     String isoEndDateTimeStr = DateConvert.isoFormat(new Date());
@@ -243,15 +83,25 @@ public class SequentialProcessor extends
   }
 
   public synchronized void resume() {
-    this.paused = false;
+    //this.paused = false;
     this.workflowInstance.setStatus(STARTED);
     this.persistWorkflowInstance();
   }
 
   public synchronized void pause() {
-    this.paused = true;
+    //this.paused = true;
     this.workflowInstance.setStatus(PAUSED);
     this.persistWorkflowInstance();
   }
 
+
+  /* (non-Javadoc)
+   * @see org.apache.oodt.cas.workflow.engine.WorkflowProcessor#getRunnableSubProcessors()
+   */
+  @Override
+  protected List<WorkflowProcessor> getRunnableSubProcessors() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
 }

Added: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SynchronousLocalEngineRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SynchronousLocalEngineRunner.java?rev=1292033&view=auto
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SynchronousLocalEngineRunner.java (added)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SynchronousLocalEngineRunner.java Tue Feb 21 21:14:47 2012
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imports
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+//OODT Imports
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+import org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance;
+import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
+
+/**
+ * 
+ * Executes a {@link WorkflowTask} locally on the WM's machine, using
+ * synchronous blocking before running the next task.
+ * 
+ * @author mattmann
+ * @version $Revision$
+ * 
+ */
+public class SynchronousLocalEngineRunner extends EngineRunner {
+
+  private static final Logger LOG = Logger
+      .getLogger(SynchronousLocalEngineRunner.class.getName());
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.EngineRunner#execute(org.apache.oodt
+   * .cas.workflow.structs.WorkflowTask, org.apache.oodt.cas.metadata.Metadata)
+   */
+  @Override
+  public void execute(WorkflowTask workflowTask, Metadata dynMetadata)
+      throws Exception {
+    WorkflowTaskInstance inst = GenericWorkflowObjectFactory
+        .getTaskObjectFromClassName(workflowTask.getTaskInstanceClassName());
+    try {
+      inst.run(dynMetadata, workflowTask.getTaskConfig());
+    } catch (Exception e) {
+      LOG.log(Level.WARNING,
+          "Exception executing task: [" + workflowTask.getTaskName()
+              + "]: Message: " + e.getMessage());
+      e.printStackTrace();
+    }
+
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.oodt.cas.workflow.engine.EngineRunner#shutdown()
+   */
+  @Override
+  public void shutdown() throws Exception {
+    // TODO Auto-generated method stub
+
+  }
+
+}

Added: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskProcessor.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskProcessor.java?rev=1292033&view=auto
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskProcessor.java (added)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskProcessor.java Tue Feb 21 21:14:47 2012
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.oodt.cas.workflow.engine;
+
+import java.net.URL;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
+import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+
+/**
+ *
+ * Describe your class here.
+ *
+ * @author mattmann
+ * @version $Revision$
+ *
+ */
+public class TaskProcessor extends WorkflowProcessor {
+
+  private WorkflowTask task;
+  
+  private static final Logger LOG = Logger.getLogger(TaskProcessor.class.getName());
+  
+  /**
+   * @param workflowInstance
+   * @param instRep
+   * @param wParentUrl
+   * @param conditionWait
+   */
+  public TaskProcessor(WorkflowInstance workflowInstance,
+      WorkflowInstanceRepository instRep, URL wParentUrl, long conditionWait) {
+    super(workflowInstance, instRep, wParentUrl, conditionWait);
+    // TODO Auto-generated constructor stub
+  }
+
+  public WorkflowTask getTask(){
+    return this.task;
+  }
+  
+  public boolean checkTaskRequiredMetadata(
+      Metadata dynMetadata) {
+    if (task.getRequiredMetFields() == null
+        || (task.getRequiredMetFields() != null && task.getRequiredMetFields()
+            .size() == 0)) {
+      LOG.log(Level.INFO, "Task: [" + task.getTaskName()
+          + "] has no required metadata fields");
+      return true; /* no required metadata, so we're fine */
+    }
+
+    for (String reqField : (List<String>) (List<?>) task.getRequiredMetFields()) {
+      if (!dynMetadata.containsKey(reqField)) {
+        LOG.log(Level.SEVERE, "Checking metadata key: [" + reqField
+            + "] for task: [" + task.getTaskName()
+            + "]: failed: aborting workflow");
+        return false;
+      }
+    }
+
+    LOG.log(Level.INFO, "All required metadata fields present for task: ["
+        + task.getTaskName() + "]");
+
+    return true;
+  }  
+  
+  /* (non-Javadoc)
+   * @see org.apache.oodt.cas.workflow.engine.WorkflowProcessor#getRunnableSubProcessors()
+   */
+  @Override
+  protected List<WorkflowProcessor> getRunnableSubProcessors() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+  
+  
+
+}

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java?rev=1292033&r1=1292032&r2=1292033&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java Tue Feb 21 21:14:47 2012
@@ -18,16 +18,13 @@
 package org.apache.oodt.cas.workflow.engine;
 
 //JDK imports
-import java.net.InetAddress;
 import java.net.URL;
-import java.net.UnknownHostException;
 import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 //OODT imports
 import org.apache.oodt.cas.metadata.Metadata;
-import org.apache.oodt.cas.resource.system.XmlRpcResourceManagerClient;
 import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
 import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
 import org.apache.oodt.cas.workflow.structs.WorkflowTask;
@@ -53,22 +50,16 @@ public abstract class WorkflowProcessor 
 
   protected WorkflowInstanceRepository instanceRepository = null;
 
-  protected XmlRpcResourceManagerClient rClient = null;
-
   protected long pollingWaitTime = 10L;
 
   protected boolean running = false;
 
   protected int timesPaused;
 
-  protected static final String DEFAULT_QUEUE_NAME = "high";
-
   protected URL wmgrParentUrl = null;
 
   protected String currentJobId = null;
 
-  protected boolean paused = false;
-
   public WorkflowProcessor(WorkflowInstance workflowInstance,
       WorkflowInstanceRepository instRep, URL wParentUrl, long conditionWait) {
     this.workflowInstance = workflowInstance;
@@ -77,7 +68,6 @@ public abstract class WorkflowProcessor 
     this.waitForConditionSatisfy = conditionWait;
     this.pollingWaitTime = conditionWait;
     this.wmgrParentUrl = wParentUrl;
-
   }
 
   /**
@@ -96,40 +86,6 @@ public abstract class WorkflowProcessor 
   }
 
   /**
-   * <p>
-   * Stops once and for all the thread from processing the workflow. This method
-   * should not maintain the state of the workflow, it should gracefully shut
-   * down the WorkflowProcessor and any of its subsequent resources.
-   * </p>
-   * 
-   */
-  public abstract void stop();
-
-  /**
-   * <p>
-   * Resumes execution of a {@link #pause}d {@link WorkflowInstace} by this
-   * WorkflowProcessor.
-   * </p>
-   * 
-   */
-  public abstract void resume();
-
-  /**
-   * <p>
-   * Pauses exectuion of a {@link WorkflowInstace} being handled by this
-   * WorkflowProcessor.
-   * </p>
-   * 
-   */
-  public abstract void pause();
-  
-  
-  /**
-   * Starts execution of the subordinate {@link WorkflowProcessor}.
-   */
-  public abstract void start();
-
-  /**
    * Returns the identifier of the current {@link WorkflowTask} being processed
    * by this WorkflowProcessor.
    * 
@@ -154,41 +110,29 @@ public abstract class WorkflowProcessor 
   public void setRunning(boolean running) {
     this.running = running;
   }
-
-  /**
-   * @return the paused
-   */
-  public boolean isPaused() {
-    return paused;
-  }
-
-  /**
-   * @param paused
-   *          the paused to set
-   */
-  public void setPaused(boolean paused) {
-    this.paused = paused;
-  }
-
-  /**
-   * @return the rClient
-   */
-  public XmlRpcResourceManagerClient getrClient() {
-    return rClient;
+  
+  @Override
+  public String toString(){
+    StringBuilder builder = new StringBuilder();
+    builder.append("processor:[type=");
+    builder.append(getClass().toString());
+    builder.append(",startdate=");
+    builder.append(getWorkflowInstance().getStartDate());
+    builder.append(",priority=");
+    builder.append(getWorkflowInstance().getPriority());
+    builder.append("]");
+    return builder.toString();
   }
 
   /**
-   * @param client
-   *          the rClient to set
+   * Gets the runnable consituent elements of this {@link WorkflowProcessor}
+   * which may include {@link WorkflowProcessor}s themselves.
+   * 
+   * @return The current runnable set of {@link WorkflowProcessor}s that this 
+   * {@link WorkflowProcessor} is modeling.
    */
-  public void setRClient(XmlRpcResourceManagerClient client) {
-    rClient = client;
-    if (rClient != null) {
-      LOG.log(Level.INFO, "Resource Manager Job Submission enabled to: ["
-          + rClient.getResMgrUrl() + "]");
-    }
-  }
-
+  protected abstract List<WorkflowProcessor> getRunnableSubProcessors();
+  
   protected void persistWorkflowInstance() {
     try {
       instanceRepository.updateWorkflowInstance(this.workflowInstance);
@@ -210,41 +154,6 @@ public abstract class WorkflowProcessor 
     }
   }
 
-  protected boolean safeCheckJobComplete(String jobId) {
-    try {
-      return rClient.isJobComplete(jobId);
-    } catch (Exception e) {
-      LOG.log(Level.WARNING, "Exception checking completion status for job: ["
-          + jobId + "]: Messsage: " + e.getMessage());
-      return false;
-    }
-  }
-
-  protected boolean checkTaskRequiredMetadata(WorkflowTask task,
-      Metadata dynMetadata) {
-    if (task.getRequiredMetFields() == null
-        || (task.getRequiredMetFields() != null && task.getRequiredMetFields()
-            .size() == 0)) {
-      LOG.log(Level.INFO, "Task: [" + task.getTaskName()
-          + "] has no required metadata fields");
-      return true; /* no required metadata, so we're fine */
-    }
-
-    for (String reqField : (List<String>) (List<?>) task.getRequiredMetFields()) {
-      if (!dynMetadata.containsKey(reqField)) {
-        LOG.log(Level.SEVERE, "Checking metadata key: [" + reqField
-            + "] for task: [" + task.getTaskName()
-            + "]: failed: aborting workflow");
-        return false;
-      }
-    }
-
-    LOG.log(Level.INFO, "All required metadata fields present for task: ["
-        + task.getTaskName() + "]");
-
-    return true;
-  }
-
   protected String getTaskNameById(String taskId) {
     for (WorkflowTask task : (List<WorkflowTask>) (List<?>) this.workflowInstance
         .getWorkflow().getTasks()) {
@@ -255,17 +164,5 @@ public abstract class WorkflowProcessor 
 
     return null;
   }
-
-  protected String getHostname() {
-    try {
-      // Get hostname by textual representation of IP address
-      InetAddress addr = InetAddress.getLocalHost();
-      // Get the host name
-      String hostname = addr.getHostName();
-      return hostname;
-    } catch (UnknownHostException e) {
-    }
-    return null;
-  }
-
+  
 }