You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/03/07 21:10:03 UTC
[airavata] 06/17: Fixing bugs in pre workflow
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch helix-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 573dbab1a29f1bf2f1fdb8c9cacdb7ad42b105ad
Author: dimuthu <di...@gmail.com>
AuthorDate: Fri Mar 2 13:16:49 2018 -0500
Fixing bugs in pre workflow
---
.../airavata/helix/agent/ssh/SshAgentAdaptor.java | 4 +-
.../apache/airavata/helix/core/AbstractTask.java | 10 ++++
.../airavata/helix/workflow/WorkflowManager.java | 2 +-
.../airavata/helix/impl/task/EnvSetupTask.java | 2 +-
.../airavata/helix/impl/task/TaskContext.java | 68 +++++++++++++++++++++-
.../impl/task/submission/GroovyMapBuilder.java | 4 +-
.../submission/task/DefaultJobSubmissionTask.java | 6 +-
.../task/submission/task/JobSubmissionTask.java | 4 +-
.../helix/impl/workflow/SimpleWorkflow.java | 5 +-
.../src/main/resources/application.properties | 2 +-
10 files changed, 92 insertions(+), 15 deletions(-)
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
index 2ad2415..5392ab5 100644
--- a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
+++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
@@ -132,13 +132,12 @@ public class SshAgentAdaptor implements AgentAdaptor {
ChannelExec channelExec = null;
try {
channelExec = ((ChannelExec) session.openChannel("exec"));
- channelExec.setCommand(command);
+ channelExec.setCommand("cd " + workingDirectory + "; " + command);
channelExec.setInputStream(null);
InputStream out = channelExec.getInputStream();
InputStream err = channelExec.getErrStream();
channelExec.connect();
- commandOutput.setExitCode(channelExec.getExitStatus());
commandOutput.readStdOutFromStream(out);
commandOutput.readStdErrFromStream(err);
return commandOutput;
@@ -150,6 +149,7 @@ public class SshAgentAdaptor implements AgentAdaptor {
throw new AgentException(e);
} finally {
if (channelExec != null) {
+ commandOutput.setExitCode(channelExec.getExitStatus());
channelExec.disconnect();
}
}
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
index 04fa37f..5aca9cd 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
@@ -26,6 +26,8 @@ public abstract class AbstractTask extends UserContentStore implements Task {
private TaskCallbackContext callbackContext;
private TaskHelper taskHelper;
+ private int retryCount = 3;
+
@Override
public void init(HelixManager manager, String workflowName, String jobName, String taskName) {
super.init(manager, workflowName, jobName, taskName);
@@ -105,4 +107,12 @@ public abstract class AbstractTask extends UserContentStore implements Task {
this.taskHelper = taskHelper;
return this;
}
+
+ public int getRetryCount() {
+ return retryCount;
+ }
+
+ public void setRetryCount(int retryCount) {
+ this.retryCount = retryCount;
+ }
}
diff --git a/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java
index ab7e3c4..9ecafb9 100644
--- a/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java
+++ b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java
@@ -61,7 +61,7 @@ public class WorkflowManager {
JobConfig.Builder job = new JobConfig.Builder()
.addTaskConfigs(taskBuilds)
.setFailureThreshold(0)
- .setMaxAttemptsPerTask(3);
+ .setMaxAttemptsPerTask(data.getRetryCount());
if (!globalParticipant) {
job.setInstanceGroupTag(taskType);
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
index eafa53d..ddba5f2 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
@@ -28,7 +28,7 @@ public class EnvSetupTask extends AiravataTask {
logger.info("Creating directory " + getTaskContext().getWorkingDir() + " on compute resource " + getTaskContext().getComputeResourceId());
adaptor.createDirectory(getTaskContext().getWorkingDir());
publishTaskState(TaskState.COMPLETED);
- return onSuccess("Successfully completed");
+ return onSuccess("Envi setup task successfully completed " + getTaskId());
} catch (Exception e) {
try {
publishTaskState(TaskState.FAILED);
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
index 64a7de8..489a196 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
@@ -13,6 +13,8 @@ import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescr
import org.apache.airavata.model.appcatalog.userresourceprofile.UserComputeResourcePreference;
import org.apache.airavata.model.appcatalog.userresourceprofile.UserResourceProfile;
import org.apache.airavata.model.appcatalog.userresourceprofile.UserStoragePreference;
+import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.data.movement.DataMovementProtocol;
import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.process.ProcessModel;
@@ -23,11 +25,13 @@ import org.apache.airavata.model.task.TaskModel;
import org.apache.airavata.registry.cpi.AppCatalog;
import org.apache.airavata.registry.cpi.AppCatalogException;
import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
import org.apache.curator.framework.CuratorFramework;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.util.*;
public class TaskContext {
@@ -436,8 +440,38 @@ public class TaskContext {
this.resourceJobManager = resourceJobManager;
}
- public ResourceJobManager getResourceJobManager() {
- return resourceJobManager;
+ public ResourceJobManager getResourceJobManager() throws Exception {
+
+ if (this.resourceJobManager == null) {
+ JobSubmissionInterface jsInterface = getPreferredJobSubmissionInterface();
+
+ if (jsInterface == null) {
+ throw new Exception("Job Submission interface cannot be empty at this point");
+ } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
+ SSHJobSubmission sshJobSubmission = getAppCatalog().getComputeResource().getSSHJobSubmission
+ (jsInterface.getJobSubmissionInterfaceId());
+ // context method.
+ resourceJobManager = sshJobSubmission.getResourceJobManager();
+ } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.LOCAL) {
+ LOCALSubmission localSubmission = getAppCatalog().getComputeResource().getLocalJobSubmission
+ (jsInterface.getJobSubmissionInterfaceId());
+ resourceJobManager = localSubmission.getResourceJobManager();
+ } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH_FORK) {
+ SSHJobSubmission sshJobSubmission = getAppCatalog().getComputeResource().getSSHJobSubmission
+ (jsInterface.getJobSubmissionInterfaceId());
+ resourceJobManager = sshJobSubmission.getResourceJobManager();
+ } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.CLOUD) {
+ return null;
+ } else {
+ throw new Exception("Unsupported JobSubmissionProtocol - " + jsInterface.getJobSubmissionProtocol()
+ .name());
+ }
+
+ if (resourceJobManager == null) {
+ throw new Exception("Resource Job Manager is empty.");
+ }
+ }
+ return this.resourceJobManager;
}
public String getLocalWorkingDir() {
@@ -794,6 +828,36 @@ public class TaskContext {
.getApplicationInterface(processModel.getApplicationInterfaceId()));
ctx.setComputeResourceDescription(appCatalog.getComputeResource().getComputeResource
(ctx.getComputeResourceId()));
+
+ List<OutputDataObjectType> applicationOutputs = ctx.getApplicationInterfaceDescription().getApplicationOutputs();
+ if (applicationOutputs != null && !applicationOutputs.isEmpty()) {
+ for (OutputDataObjectType outputDataObjectType : applicationOutputs) {
+ if (outputDataObjectType.getType().equals(DataType.STDOUT)) {
+ if (outputDataObjectType.getValue() == null || outputDataObjectType.getValue().equals("")) {
+ String stdOut = (ctx.getWorkingDir().endsWith(File.separator) ? ctx.getWorkingDir() : ctx.getWorkingDir() + File.separator)
+ + ctx.getApplicationInterfaceDescription().getApplicationName() + ".stdout";
+ outputDataObjectType.setValue(stdOut);
+ ctx.setStdoutLocation(stdOut);
+ } else {
+ ctx.setStdoutLocation(outputDataObjectType.getValue());
+ }
+ }
+ if (outputDataObjectType.getType().equals(DataType.STDERR)) {
+ if (outputDataObjectType.getValue() == null || outputDataObjectType.getValue().equals("")) {
+ String stderrLocation = (ctx.getWorkingDir().endsWith(File.separator) ? ctx.getWorkingDir() : ctx.getWorkingDir() + File.separator)
+ + ctx.getApplicationInterfaceDescription().getApplicationName() + ".stderr";
+ outputDataObjectType.setValue(stderrLocation);
+ ctx.setStderrLocation(stderrLocation);
+ } else {
+ ctx.setStderrLocation(outputDataObjectType.getValue());
+ }
+ }
+ }
+ }
+
+ // TODO move this to some where else as this is not the correct place to do so
+ experimentCatalog.update(ExperimentCatalogModelType.PROCESS, processModel, processId);
+ processModel.setProcessOutputs(applicationOutputs);
return ctx;
}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java
index e4267ce..2119755 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java
@@ -64,7 +64,7 @@ public class GroovyMapBuilder {
mapData.setInputs(inputValues);
List<String> inputValuesAll = getProcessInputValues(taskContext.getProcessModel().getProcessInputs(), false);
- inputValues.addAll(getProcessOutputValues(taskContext.getProcessModel().getProcessOutputs(), false));
+ inputValuesAll.addAll(getProcessOutputValues(taskContext.getProcessModel().getProcessOutputs(), false));
mapData.setInputsAll(inputValuesAll);
mapData.setUserName(taskContext.getComputeResourceLoginUserName());
@@ -103,7 +103,7 @@ public class GroovyMapBuilder {
mapData.setQueueName(scheduling.getQueueName());
}
if (totalNodeCount > 0) {
- mapData.setNodes(totalCPUCount);
+ mapData.setNodes(totalNodeCount);
}
if (totalCPUCount > 0) {
int ppn = totalCPUCount / totalNodeCount;
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
index c85e18b..e21f200 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
@@ -46,6 +46,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
jobModel.setTaskId(getTaskId());
jobModel.setJobName(mapData.getJobName());
+ jobModel.setJobDescription("Sample description");
if (mapData != null) {
//jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
@@ -71,10 +72,11 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
statusList.add(new JobStatus(JobState.FAILED));
statusList.get(0).setReason(submissionOutput.getFailureReason());
jobModel.setJobStatuses(statusList);
- jobModel.setJobDescription("Sample description");
saveJobModel(jobModel);
logger.error("expId: " + getExperimentId() + ", processid: " + getProcessId()+ ", taskId: " +
- getTaskId() + " :- Job submission failed for job name " + jobModel.getJobName());
+ getTaskId() + " :- Job submission failed for job name " + jobModel.getJobName()
+ + ". Exit code : " + submissionOutput.getExitCode() + ", Submission failed : "
+ + submissionOutput.isJobSubmissionFailed());
ErrorModel errorModel = new ErrorModel();
errorModel.setUserFriendlyMessage(submissionOutput.getFailureReason());
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
index b517af1..ac314e9 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
@@ -181,9 +181,9 @@ public abstract class JobSubmissionTask extends AiravataTask {
MessageContext msgCtx = new MessageContext(jobStatusChangeEvent, MessageType.JOB, AiravataUtils.getId
(MessageType.JOB.name()), getGatewayId());
msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
- getStatusPublisher().publish(msgCtx);
+ //getStatusPublisher().publish(msgCtx);
} catch (Exception e) {
- throw new Exception("Error persisting job status" + e.getLocalizedMessage(), e);
+ throw new Exception("Error persisting job status " + e.getLocalizedMessage(), e);
}
}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
index 63921db..abd36e1 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
@@ -27,7 +27,7 @@ public class SimpleWorkflow {
public static void main(String[] args) throws Exception {
- String processId = "PROCESS_438a87cc-2dec-4edc-bfeb-31128df91bb6";
+ String processId = "PROCESS_5b252ad9-d630-4cf9-80e3-0c30c55d1001";
AppCatalog appCatalog = RegistryFactory.getAppCatalog();
ExperimentCatalog experimentCatalog = RegistryFactory.getDefaultExpCatalog();
@@ -51,10 +51,11 @@ public class SimpleWorkflow {
airavataTask = new EnvSetupTask();
} else if (taskModel.getTaskType() == TaskTypes.JOB_SUBMISSION) {
airavataTask = new DefaultJobSubmissionTask();
+ airavataTask.setRetryCount(1);
jobSubmissionFound = true;
} else if (taskModel.getTaskType() == TaskTypes.DATA_STAGING) {
if (jobSubmissionFound) {
- airavataTask = new OutputDataStagingTask();
+ //airavataTask = new OutputDataStagingTask();
} else {
airavataTask = new InputDataStagingTask();
}
diff --git a/modules/helix-spectator/src/main/resources/application.properties b/modules/helix-spectator/src/main/resources/application.properties
index a9b0969..b4b8048 100644
--- a/modules/helix-spectator/src/main/resources/application.properties
+++ b/modules/helix-spectator/src/main/resources/application.properties
@@ -1,3 +1,3 @@
zookeeper.connection.url=localhost:2199
helix.cluster.name=AiravataDemoCluster
-participant.name=all-p2
\ No newline at end of file
+participant.name=all-p3
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.