You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/03/07 21:10:01 UTC

[airavata] 04/17: Fixing env setup task

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch helix-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit cb54e4df2eb5ae453290a5d29cd3c0a8033c993d
Author: dimuthu <di...@gmail.com>
AuthorDate: Tue Feb 27 11:04:32 2018 -0500

    Fixing env setup task
---
 .../apache/airavata/helix/core/util/TaskUtil.java  | 19 ++++----
 .../airavata/helix/impl/task/AiravataTask.java     | 10 ++--
 .../airavata/helix/impl/task/EnvSetupTask.java     | 37 ++++-----------
 .../helix/impl/workflow/SimpleWorkflow.java        | 54 ++++++++++++++++++++--
 4 files changed, 75 insertions(+), 45 deletions(-)

diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
index d0f1ab6..218bd94 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
@@ -19,15 +19,18 @@ import java.util.Map;
  */
 public class TaskUtil {
 
-    public static <T extends AbstractTask> List<OutPort> getOutPortsOfTask(T task) throws IllegalAccessException {
-        Field[] fields = task.getClass().getDeclaredFields();
+    public static <T extends AbstractTask> List<OutPort> getOutPortsOfTask(T taskObj) throws IllegalAccessException {
+
         List<OutPort> outPorts = new ArrayList<>();
-        for (Field field : fields) {
-            TaskOutPort outPortAnnotation = field.getAnnotation(TaskOutPort.class);
-            if (outPortAnnotation != null) {
-                field.setAccessible(true);
-                OutPort outPort = (OutPort) field.get(task);
-                outPorts.add(outPort);
+        for (Class<?> c = taskObj.getClass(); c != null; c = c.getSuperclass()) {
+            Field[] fields = c.getDeclaredFields();
+            for (Field field : fields) {
+                TaskOutPort outPortAnnotation = field.getAnnotation(TaskOutPort.class);
+                if (outPortAnnotation != null) {
+                    field.setAccessible(true);
+                    OutPort outPort = (OutPort) field.get(taskObj);
+                    outPorts.add(outPort);
+                }
             }
         }
         return outPorts;
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index 315c07c..26361d2 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -51,14 +51,13 @@ public abstract class AiravataTask extends AbstractTask {
     @TaskParam(name = "gatewayId")
     private String gatewayId;
 
-    @TaskOutPort(name = "Success Port")
-    private OutPort onSuccess;
-
+    @TaskOutPort(name = "Next Task")
+    private OutPort nextTask;
 
     protected TaskResult onSuccess(String message) {
         String successMessage = "Task " + getTaskId() + " completed." + message != null ? " Message : " + message : "";
         logger.info(successMessage);
-        return onSuccess.invoke(new TaskResult(TaskResult.Status.COMPLETED, message));
+        return nextTask.invoke(new TaskResult(TaskResult.Status.COMPLETED, message));
     }
 
     protected TaskResult onFail(String reason, boolean fatal, Throwable error) {
@@ -178,4 +177,7 @@ public abstract class AiravataTask extends AbstractTask {
         return processModel;
     }
 
+    public void setNextTask(OutPort nextTask) {
+        this.nextTask = nextTask;
+    }
 }
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
index cabc014..eafa53d 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
@@ -1,23 +1,18 @@
 package org.apache.airavata.helix.impl.task;
 
 import org.apache.airavata.agents.api.AgentAdaptor;
-import org.apache.airavata.helix.core.OutPort;
 import org.apache.airavata.helix.task.api.TaskHelper;
 import org.apache.airavata.helix.task.api.annotation.TaskDef;
-import org.apache.airavata.helix.task.api.annotation.TaskOutPort;
-import org.apache.airavata.helix.task.api.annotation.TaskParam;
 import org.apache.airavata.model.status.TaskState;
 import org.apache.airavata.registry.cpi.RegistryException;
 import org.apache.helix.task.TaskResult;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 @TaskDef(name = "Environment Setup Task")
 public class EnvSetupTask extends AiravataTask {
 
-    @TaskParam(name = "Working Directory")
-    private String workingDirectory;
-
-    @TaskOutPort(name = "Success Out Port")
-    private OutPort successPort;
+    private static final Logger logger = LogManager.getLogger(EnvSetupTask.class);
 
     @Override
     public TaskResult onRun(TaskHelper taskHelper) {
@@ -30,18 +25,19 @@ public class EnvSetupTask extends AiravataTask {
                     getTaskContext().getComputeResourceCredentialToken(),
                     getTaskContext().getComputeResourceLoginUserName());
 
-            adaptor.createDirectory(workingDirectory);
+            logger.info("Creating directory " + getTaskContext().getWorkingDir() + " on compute resource " + getTaskContext().getComputeResourceId());
+            adaptor.createDirectory(getTaskContext().getWorkingDir());
             publishTaskState(TaskState.COMPLETED);
-            return successPort.invoke(new TaskResult(TaskResult.Status.COMPLETED, "Successfully completed"));
+            return onSuccess("Successfully completed");
         } catch (Exception e) {
             try {
                 publishTaskState(TaskState.FAILED);
             } catch (RegistryException e1) {
-                publishErrors(e1);
+                logger.error("Task failed to publish task status", e1);
+
                 // ignore silently
             }
-            publishErrors(e);
-            return new TaskResult(TaskResult.Status.FAILED, "Failed the task");
+            return onFail("Failed to setup environment of task " + getTaskId(), true, e);
         }
     }
 
@@ -50,19 +46,4 @@ public class EnvSetupTask extends AiravataTask {
 
     }
 
-    public String getWorkingDirectory() {
-        return workingDirectory;
-    }
-
-    public void setWorkingDirectory(String workingDirectory) {
-        this.workingDirectory = workingDirectory;
-    }
-
-    public OutPort getSuccessPort() {
-        return successPort;
-    }
-
-    public void setSuccessPort(OutPort successPort) {
-        this.successPort = successPort;
-    }
 }
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
index 397ff45..99db2c4 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
@@ -1,22 +1,66 @@
 package org.apache.airavata.helix.impl.workflow;
 
 import org.apache.airavata.helix.core.AbstractTask;
+import org.apache.airavata.helix.core.OutPort;
+import org.apache.airavata.helix.impl.task.AiravataTask;
 import org.apache.airavata.helix.impl.task.EnvSetupTask;
 import org.apache.airavata.helix.impl.task.submission.task.DefaultJobSubmissionTask;
 import org.apache.airavata.helix.workflow.WorkflowManager;
+import org.apache.airavata.model.experiment.ExperimentModel;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.task.TaskModel;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.AppCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 public class SimpleWorkflow {
 
     public static void main(String[] args) throws Exception {
 
-        EnvSetupTask envSetupTask = new EnvSetupTask();
-        envSetupTask.setWorkingDirectory("/tmp/a");
+        String processId = "PROCESS_438a87cc-2dec-4edc-bfeb-31128df91bb6";
+        AppCatalog appCatalog = RegistryFactory.getAppCatalog();
+        ExperimentCatalog experimentCatalog = RegistryFactory.getDefaultExpCatalog();
 
-        DefaultJobSubmissionTask defaultJobSubmissionTask = new DefaultJobSubmissionTask();
+        ProcessModel processModel = (ProcessModel) experimentCatalog.get(ExperimentCatalogModelType.PROCESS, processId);
+        ExperimentModel experimentModel = (ExperimentModel) experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, processModel.getExperimentId());
+        String taskDag = processModel.getTaskDag();
+        List<TaskModel> taskList = processModel.getTasks();
+
+        String[] taskIds = taskDag.split(",");
+        final List<AiravataTask> allTasks = new ArrayList<>();
+
+        for (String taskId : taskIds) {
+            Optional<TaskModel> model = taskList.stream().filter(taskModel -> taskModel.getTaskId().equals(taskId)).findFirst();
+            model.ifPresent(taskModel -> {
+                AiravataTask airavataTask = null;
+                if (taskModel.getTaskType() == TaskTypes.ENV_SETUP) {
+                    airavataTask = new EnvSetupTask();
+                } else if (taskModel.getTaskType() == TaskTypes.JOB_SUBMISSION) {
+                    airavataTask = new DefaultJobSubmissionTask();
+                }
+
+                if (airavataTask != null) {
+                    airavataTask.setGatewayId(experimentModel.getGatewayId());
+                    airavataTask.setExperimentId(experimentModel.getExperimentId());
+                    airavataTask.setProcessId(processModel.getProcessId());
+                    airavataTask.setTaskId(taskModel.getTaskId());
+                    if (allTasks.size() > 0) {
+                        allTasks.get(allTasks.size() -1).setNextTask(new OutPort(airavataTask.getTaskId(), airavataTask));
+                    }
+                    allTasks.add(airavataTask);
+                }
+            });
+        }
+
+/*        DefaultJobSubmissionTask defaultJobSubmissionTask = new DefaultJobSubmissionTask();
         defaultJobSubmissionTask.setGatewayId("default");
         defaultJobSubmissionTask.setExperimentId("Clone_of_Mothur-Test1_0c9f627e-2c32-403e-a28a-2a8b10c21c1a");
         defaultJobSubmissionTask.setProcessId("PROCESS_438a87cc-2dec-4edc-bfeb-31128df91bb6");
@@ -24,8 +68,8 @@ public class SimpleWorkflow {
 
         List<AbstractTask> tasks = new ArrayList<>();
         tasks.add(defaultJobSubmissionTask);
-
+*/
         WorkflowManager workflowManager = new WorkflowManager("AiravataDemoCluster", "wm-22", "localhost:2199");
-        workflowManager.launchWorkflow(UUID.randomUUID().toString(), tasks, true);
+        workflowManager.launchWorkflow(UUID.randomUUID().toString(), allTasks.stream().map(t -> (AiravataTask)t).collect(Collectors.toList()), true);
     }
 }

-- 
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.