You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/03/07 21:10:01 UTC
[airavata] 04/17: Fixing env setup task
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch helix-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git
commit cb54e4df2eb5ae453290a5d29cd3c0a8033c993d
Author: dimuthu <di...@gmail.com>
AuthorDate: Tue Feb 27 11:04:32 2018 -0500
Fixing env setup task
---
.../apache/airavata/helix/core/util/TaskUtil.java | 19 ++++----
.../airavata/helix/impl/task/AiravataTask.java | 10 ++--
.../airavata/helix/impl/task/EnvSetupTask.java | 37 ++++-----------
.../helix/impl/workflow/SimpleWorkflow.java | 54 ++++++++++++++++++++--
4 files changed, 75 insertions(+), 45 deletions(-)
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
index d0f1ab6..218bd94 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
@@ -19,15 +19,18 @@ import java.util.Map;
*/
public class TaskUtil {
- public static <T extends AbstractTask> List<OutPort> getOutPortsOfTask(T task) throws IllegalAccessException {
- Field[] fields = task.getClass().getDeclaredFields();
+ public static <T extends AbstractTask> List<OutPort> getOutPortsOfTask(T taskObj) throws IllegalAccessException {
+
List<OutPort> outPorts = new ArrayList<>();
- for (Field field : fields) {
- TaskOutPort outPortAnnotation = field.getAnnotation(TaskOutPort.class);
- if (outPortAnnotation != null) {
- field.setAccessible(true);
- OutPort outPort = (OutPort) field.get(task);
- outPorts.add(outPort);
+ for (Class<?> c = taskObj.getClass(); c != null; c = c.getSuperclass()) {
+ Field[] fields = c.getDeclaredFields();
+ for (Field field : fields) {
+ TaskOutPort outPortAnnotation = field.getAnnotation(TaskOutPort.class);
+ if (outPortAnnotation != null) {
+ field.setAccessible(true);
+ OutPort outPort = (OutPort) field.get(taskObj);
+ outPorts.add(outPort);
+ }
}
}
return outPorts;
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index 315c07c..26361d2 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -51,14 +51,13 @@ public abstract class AiravataTask extends AbstractTask {
@TaskParam(name = "gatewayId")
private String gatewayId;
- @TaskOutPort(name = "Success Port")
- private OutPort onSuccess;
-
+ @TaskOutPort(name = "Next Task")
+ private OutPort nextTask;
protected TaskResult onSuccess(String message) {
String successMessage = "Task " + getTaskId() + " completed." + message != null ? " Message : " + message : "";
logger.info(successMessage);
- return onSuccess.invoke(new TaskResult(TaskResult.Status.COMPLETED, message));
+ return nextTask.invoke(new TaskResult(TaskResult.Status.COMPLETED, message));
}
protected TaskResult onFail(String reason, boolean fatal, Throwable error) {
@@ -178,4 +177,7 @@ public abstract class AiravataTask extends AbstractTask {
return processModel;
}
+ public void setNextTask(OutPort nextTask) {
+ this.nextTask = nextTask;
+ }
}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
index cabc014..eafa53d 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
@@ -1,23 +1,18 @@
package org.apache.airavata.helix.impl.task;
import org.apache.airavata.agents.api.AgentAdaptor;
-import org.apache.airavata.helix.core.OutPort;
import org.apache.airavata.helix.task.api.TaskHelper;
import org.apache.airavata.helix.task.api.annotation.TaskDef;
-import org.apache.airavata.helix.task.api.annotation.TaskOutPort;
-import org.apache.airavata.helix.task.api.annotation.TaskParam;
import org.apache.airavata.model.status.TaskState;
import org.apache.airavata.registry.cpi.RegistryException;
import org.apache.helix.task.TaskResult;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
@TaskDef(name = "Environment Setup Task")
public class EnvSetupTask extends AiravataTask {
- @TaskParam(name = "Working Directory")
- private String workingDirectory;
-
- @TaskOutPort(name = "Success Out Port")
- private OutPort successPort;
+ private static final Logger logger = LogManager.getLogger(EnvSetupTask.class);
@Override
public TaskResult onRun(TaskHelper taskHelper) {
@@ -30,18 +25,19 @@ public class EnvSetupTask extends AiravataTask {
getTaskContext().getComputeResourceCredentialToken(),
getTaskContext().getComputeResourceLoginUserName());
- adaptor.createDirectory(workingDirectory);
+ logger.info("Creating directory " + getTaskContext().getWorkingDir() + " on compute resource " + getTaskContext().getComputeResourceId());
+ adaptor.createDirectory(getTaskContext().getWorkingDir());
publishTaskState(TaskState.COMPLETED);
- return successPort.invoke(new TaskResult(TaskResult.Status.COMPLETED, "Successfully completed"));
+ return onSuccess("Successfully completed");
} catch (Exception e) {
try {
publishTaskState(TaskState.FAILED);
} catch (RegistryException e1) {
- publishErrors(e1);
+ logger.error("Task failed to publish task status", e1);
+
// ignore silently
}
- publishErrors(e);
- return new TaskResult(TaskResult.Status.FAILED, "Failed the task");
+ return onFail("Failed to setup environment of task " + getTaskId(), true, e);
}
}
@@ -50,19 +46,4 @@ public class EnvSetupTask extends AiravataTask {
}
- public String getWorkingDirectory() {
- return workingDirectory;
- }
-
- public void setWorkingDirectory(String workingDirectory) {
- this.workingDirectory = workingDirectory;
- }
-
- public OutPort getSuccessPort() {
- return successPort;
- }
-
- public void setSuccessPort(OutPort successPort) {
- this.successPort = successPort;
- }
}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
index 397ff45..99db2c4 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
@@ -1,22 +1,66 @@
package org.apache.airavata.helix.impl.workflow;
import org.apache.airavata.helix.core.AbstractTask;
+import org.apache.airavata.helix.core.OutPort;
+import org.apache.airavata.helix.impl.task.AiravataTask;
import org.apache.airavata.helix.impl.task.EnvSetupTask;
import org.apache.airavata.helix.impl.task.submission.task.DefaultJobSubmissionTask;
import org.apache.airavata.helix.workflow.WorkflowManager;
+import org.apache.airavata.model.experiment.ExperimentModel;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.task.TaskModel;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.AppCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.UUID;
+import java.util.stream.Collectors;
public class SimpleWorkflow {
public static void main(String[] args) throws Exception {
- EnvSetupTask envSetupTask = new EnvSetupTask();
- envSetupTask.setWorkingDirectory("/tmp/a");
+ String processId = "PROCESS_438a87cc-2dec-4edc-bfeb-31128df91bb6";
+ AppCatalog appCatalog = RegistryFactory.getAppCatalog();
+ ExperimentCatalog experimentCatalog = RegistryFactory.getDefaultExpCatalog();
- DefaultJobSubmissionTask defaultJobSubmissionTask = new DefaultJobSubmissionTask();
+ ProcessModel processModel = (ProcessModel) experimentCatalog.get(ExperimentCatalogModelType.PROCESS, processId);
+ ExperimentModel experimentModel = (ExperimentModel) experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, processModel.getExperimentId());
+ String taskDag = processModel.getTaskDag();
+ List<TaskModel> taskList = processModel.getTasks();
+
+ String[] taskIds = taskDag.split(",");
+ final List<AiravataTask> allTasks = new ArrayList<>();
+
+ for (String taskId : taskIds) {
+ Optional<TaskModel> model = taskList.stream().filter(taskModel -> taskModel.getTaskId().equals(taskId)).findFirst();
+ model.ifPresent(taskModel -> {
+ AiravataTask airavataTask = null;
+ if (taskModel.getTaskType() == TaskTypes.ENV_SETUP) {
+ airavataTask = new EnvSetupTask();
+ } else if (taskModel.getTaskType() == TaskTypes.JOB_SUBMISSION) {
+ airavataTask = new DefaultJobSubmissionTask();
+ }
+
+ if (airavataTask != null) {
+ airavataTask.setGatewayId(experimentModel.getGatewayId());
+ airavataTask.setExperimentId(experimentModel.getExperimentId());
+ airavataTask.setProcessId(processModel.getProcessId());
+ airavataTask.setTaskId(taskModel.getTaskId());
+ if (allTasks.size() > 0) {
+ allTasks.get(allTasks.size() -1).setNextTask(new OutPort(airavataTask.getTaskId(), airavataTask));
+ }
+ allTasks.add(airavataTask);
+ }
+ });
+ }
+
+/* DefaultJobSubmissionTask defaultJobSubmissionTask = new DefaultJobSubmissionTask();
defaultJobSubmissionTask.setGatewayId("default");
defaultJobSubmissionTask.setExperimentId("Clone_of_Mothur-Test1_0c9f627e-2c32-403e-a28a-2a8b10c21c1a");
defaultJobSubmissionTask.setProcessId("PROCESS_438a87cc-2dec-4edc-bfeb-31128df91bb6");
@@ -24,8 +68,8 @@ public class SimpleWorkflow {
List<AbstractTask> tasks = new ArrayList<>();
tasks.add(defaultJobSubmissionTask);
-
+*/
WorkflowManager workflowManager = new WorkflowManager("AiravataDemoCluster", "wm-22", "localhost:2199");
- workflowManager.launchWorkflow(UUID.randomUUID().toString(), tasks, true);
+ workflowManager.launchWorkflow(UUID.randomUUID().toString(), allTasks.stream().map(t -> (AiravataTask)t).collect(Collectors.toList()), true);
}
}
--
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.