You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/01/13 18:03:10 UTC
[33/42] airavata git commit: Idetify job submission failures using
output parsers
Idetify job submission failures using output parsers
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/843940fa
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/843940fa
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/843940fa
Branch: refs/heads/develop
Commit: 843940fa0bdde4e5cca397e8acdf38125c92ee16
Parents: 4792eac
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Fri Jan 8 16:03:26 2016 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Fri Jan 8 16:03:26 2016 -0500
----------------------------------------------------------------------
.../gfac/core/cluster/JobSubmissionOutput.java | 18 +++
.../gfac/core/cluster/OutputParser.java | 8 ++
.../airavata/gfac/impl/HPCRemoteCluster.java | 6 +
.../gfac/impl/job/ForkOutputParser.java | 5 +
.../airavata/gfac/impl/job/LSFOutputParser.java | 5 +
.../airavata/gfac/impl/job/PBSOutputParser.java | 5 +
.../gfac/impl/job/SlurmOutputParser.java | 7 ++
.../airavata/gfac/impl/job/UGEOutputParser.java | 5 +
.../impl/task/DefaultJobSubmissionTask.java | 112 +++++++++++--------
9 files changed, 123 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobSubmissionOutput.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobSubmissionOutput.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobSubmissionOutput.java
index 6632ab5..d912409 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobSubmissionOutput.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobSubmissionOutput.java
@@ -27,6 +27,8 @@ public class JobSubmissionOutput {
private String stdErr;
private String command;
private String jobId;
+ private boolean isJobSubmissionFailed;
+ private String failureReason;
public int getExitCode() {
return exitCode;
@@ -67,4 +69,20 @@ public class JobSubmissionOutput {
public void setJobId(String jobId) {
this.jobId = jobId;
}
+
+ public boolean isJobSubmissionFailed() {
+ return isJobSubmissionFailed;
+ }
+
+ public void setJobSubmissionFailed(boolean jobSubmissionFailed) {
+ isJobSubmissionFailed = jobSubmissionFailed;
+ }
+
+ public String getFailureReason() {
+ return failureReason;
+ }
+
+ public void setFailureReason(String failureReason) {
+ this.failureReason = failureReason;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/OutputParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/OutputParser.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/OutputParser.java
index 521e23f..18de355 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/OutputParser.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/OutputParser.java
@@ -44,6 +44,14 @@ public interface OutputParser {
/**
+ * Parse output return by job submission task and identify jobSubmission failures.
+ * @param rawOutput
+ * @return true if job submission has been failed, false otherwise.
+ */
+ public boolean isJobSubmissionFailed(String rawOutput);
+
+
+ /**
* This can be used to get the job status from the output
* @param jobID
* @param rawOutput
http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
index 8c4a4c0..022c8bc 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
@@ -91,6 +91,12 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
executeCommand(submitCommand, reader);
// throwExceptionOnError(reader, submitCommand);
jsoutput.setJobId(outputParser.parseJobSubmission(reader.getStdOutputString()));
+ if (jsoutput.getJobId() == null) {
+ if (outputParser.isJobSubmissionFailed(reader.getStdOutputString())) {
+ jsoutput.setJobSubmissionFailed(true);
+ jsoutput.setFailureReason(reader.getStdOutputString());
+ }
+ }
jsoutput.setExitCode(reader.getExitCode());
jsoutput.setStdOut(reader.getStdOutputString());
jsoutput.setStdErr(reader.getStdErrorString());
http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/ForkOutputParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/ForkOutputParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/ForkOutputParser.java
index 72856e5..b99db30 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/ForkOutputParser.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/ForkOutputParser.java
@@ -44,6 +44,11 @@ public class ForkOutputParser implements OutputParser {
}
@Override
+ public boolean isJobSubmissionFailed(String rawOutput) {
+ return false;
+ }
+
+ @Override
public JobStatus parseJobStatus(String jobID, String rawOutput) throws SSHApiException {
return null;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/LSFOutputParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/LSFOutputParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/LSFOutputParser.java
index b16aa9b..bb0ae46 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/LSFOutputParser.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/LSFOutputParser.java
@@ -52,6 +52,11 @@ public class LSFOutputParser implements OutputParser {
}
@Override
+ public boolean isJobSubmissionFailed(String rawOutput) {
+ return false;
+ }
+
+ @Override
public JobStatus parseJobStatus(String jobID, String rawOutput) throws SSHApiException {
boolean jobFount = false;
logger.debug(rawOutput);
http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/PBSOutputParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/PBSOutputParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/PBSOutputParser.java
index f187724..7f97a68 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/PBSOutputParser.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/PBSOutputParser.java
@@ -135,6 +135,11 @@ public class PBSOutputParser implements OutputParser {
return jobId; //In PBS stdout is going to be directly the jobID
}
+ @Override
+ public boolean isJobSubmissionFailed(String rawOutput) {
+ return false;
+ }
+
public JobStatus parseJobStatus(String jobID, String rawOutput) {
boolean jobFount = false;
log.debug(rawOutput);
http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/SlurmOutputParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/SlurmOutputParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/SlurmOutputParser.java
index fecb5e7..dff0a9b 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/SlurmOutputParser.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/SlurmOutputParser.java
@@ -115,6 +115,13 @@ public class SlurmOutputParser implements OutputParser {
return "";
}
+ @Override
+ public boolean isJobSubmissionFailed(String rawOutput) {
+ Pattern pattern = Pattern.compile("FAILED");
+ Matcher matcher = pattern.matcher(rawOutput);
+ return matcher.find();
+ }
+
public JobStatus parseJobStatus(String jobID, String rawOutput) throws SSHApiException {
log.info(rawOutput);
Pattern pattern = Pattern.compile(jobID + "(?=\\s+\\S+\\s+\\S+\\s+\\S+\\s+(?<" + STATUS + ">\\w+))");
http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/UGEOutputParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/UGEOutputParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/UGEOutputParser.java
index 0ece2d9..f19d4f7 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/UGEOutputParser.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/UGEOutputParser.java
@@ -135,6 +135,11 @@ public class UGEOutputParser implements OutputParser {
}
}
+ @Override
+ public boolean isJobSubmissionFailed(String rawOutput) {
+ return false;
+ }
+
public JobStatus parseJobStatus(String jobID, String rawOutput) {
Pattern pattern = Pattern.compile("job_number:[\\s]+" + jobID);
Matcher matcher = pattern.matcher(rawOutput);
http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
index 68d3bac..ebdda13 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
@@ -82,40 +82,56 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
jobModel.setStdErr(jobSubmissionOutput.getStdErr());
jobModel.setStdOut(jobSubmissionOutput.getStdOut());
String jobId = jobSubmissionOutput.getJobId();
- if (exitCode != 0 && jobId == null) {
+ if (exitCode != 0 || jobSubmissionOutput.isJobSubmissionFailed()) {
jobModel.setJobId(DEFAULT_JOB_ID);
- GFacUtils.saveJobModel(processContext, jobModel);
- String msg;
- if (exitCode != Integer.MIN_VALUE) {
- msg = "expId:" + processContext.getProcessModel().getExperimentId() + ", processId:" +
- processContext.getProcessId() + ", taskId: " + taskContext.getTaskId() +
- " return non zero exit code:" + exitCode + " for JobName:" + jobModel.getJobName() +
- ", Hence changing job state to Failed";
+ if (jobSubmissionOutput.isJobSubmissionFailed()) {
+ jobModel.setJobStatus(new JobStatus(JobState.FAILED));
+ jobModel.getJobStatus().setReason(jobSubmissionOutput.getFailureReason());
+ log.error("expId: {}, processid: {}, taskId: {} :- Job submission failed for job name {}",
+ taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(), jobModel.getJobName());
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setUserFriendlyMessage(jobSubmissionOutput.getFailureReason());
+ errorModel.setActualErrorMessage(jobSubmissionOutput.getFailureReason());
+ GFacUtils.saveExperimentError(processContext, errorModel);
+ GFacUtils.saveProcessError(processContext, errorModel);
+ GFacUtils.saveTaskError(taskContext, errorModel);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason("Job submission command exit with non zero exit code");
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ taskContext.setTaskStatus(taskStatus);
} else {
- msg = "expId:" + processContext.getProcessModel().getExperimentId() + ", processId:" +
- processContext.getProcessId() + ", taskId: " + taskContext.getTaskId() +
- " doesn't return valid job submission exit code for JobName:" + jobModel.getJobName() +
- ", Hence changing job state to Failed";
+ String msg;
+ GFacUtils.saveJobModel(processContext, jobModel);
+ if (exitCode != Integer.MIN_VALUE) {
+ msg = "expId:" + processContext.getProcessModel().getExperimentId() + ", processId:" +
+ processContext.getProcessId() + ", taskId: " + taskContext.getTaskId() +
+ " return non zero exit code:" + exitCode + " for JobName:" + jobModel.getJobName() +
+ ", Hence changing job state to Failed";
+ } else {
+ msg = "expId:" + processContext.getProcessModel().getExperimentId() + ", processId:" +
+ processContext.getProcessId() + ", taskId: " + taskContext.getTaskId() +
+ " doesn't return valid job submission exit code for JobName:" + jobModel.getJobName() +
+ ", Hence changing job state to Failed";
+ }
+ log.error(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setUserFriendlyMessage(msg);
+ errorModel.setActualErrorMessage(msg);
+ GFacUtils.saveExperimentError(processContext, errorModel);
+ GFacUtils.saveProcessError(processContext, errorModel);
+ GFacUtils.saveTaskError(taskContext, errorModel);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason("Job submission command exit with non zero exit code");
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ taskContext.setTaskStatus(taskStatus);
}
- log.error(msg);
- ErrorModel errorModel = new ErrorModel();
- errorModel.setUserFriendlyMessage(msg);
- errorModel.setActualErrorMessage(msg);
- GFacUtils.saveExperimentError(processContext, errorModel);
- GFacUtils.saveProcessError(processContext, errorModel);
- GFacUtils.saveTaskError(taskContext, errorModel);
- taskStatus.setState(TaskState.FAILED);
- taskStatus.setReason("Job submission command exit with non zero exit code");
- taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- taskContext.setTaskStatus(taskStatus);
try {
GFacUtils.saveAndPublishTaskStatus(taskContext);
} catch (GFacException e) {
log.error("Error while saving task status", e);
}
return taskStatus;
- }
- if (jobId != null && !jobId.isEmpty()) {
+ } else if (jobId != null && !jobId.isEmpty()) {
jobModel.setJobId(jobId);
GFacUtils.saveJobModel(processContext, jobModel);
jobStatus.setJobState(JobState.SUBMITTED);
@@ -134,29 +150,29 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
taskStatus = new TaskStatus(TaskState.COMPLETED);
taskStatus.setReason("Submitted job to compute resource");
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- } else {
- int verificationTryCount = 0;
- while (verificationTryCount++ < 3) {
- String verifyJobId = verifyJobSubmission(remoteCluster, jobModel);
- if (verifyJobId != null && !verifyJobId.isEmpty()) {
- // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
- jobId = verifyJobId;
- jobModel.setJobId(jobId);
- GFacUtils.saveJobModel(processContext,jobModel);
- jobStatus.setJobState(JobState.QUEUED);
- jobStatus.setReason("Verification step succeeded");
- jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- jobModel.setJobStatus(jobStatus);
- GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
- taskStatus.setState(TaskState.COMPLETED);
- taskStatus.setReason("Submitted job to compute resource");
- taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- break;
- }
- log.info("Verify step return invalid jobId, retry verification step in {} secs", verificationTryCount * 10);
- Thread.sleep(verificationTryCount * 10000);
- }
- }
+ } else {
+ int verificationTryCount = 0;
+ while (verificationTryCount++ < 3) {
+ String verifyJobId = verifyJobSubmission(remoteCluster, jobModel);
+ if (verifyJobId != null && !verifyJobId.isEmpty()) {
+ // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
+ jobId = verifyJobId;
+ jobModel.setJobId(jobId);
+ GFacUtils.saveJobModel(processContext, jobModel);
+ jobStatus.setJobState(JobState.QUEUED);
+ jobStatus.setReason("Verification step succeeded");
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ jobModel.setJobStatus(jobStatus);
+ GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+ taskStatus.setState(TaskState.COMPLETED);
+ taskStatus.setReason("Submitted job to compute resource");
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ break;
+ }
+ log.info("Verify step return invalid jobId, retry verification step in {} secs", verificationTryCount * 10);
+ Thread.sleep(verificationTryCount * 10000);
+ }
+ }
if (jobId == null || jobId.isEmpty()) {
jobModel.setJobId(DEFAULT_JOB_ID);