You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/01/13 18:03:10 UTC

[33/42] airavata git commit: Idetify job submission failures using output parsers

Idetify job submission failures using output parsers


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/843940fa
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/843940fa
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/843940fa

Branch: refs/heads/develop
Commit: 843940fa0bdde4e5cca397e8acdf38125c92ee16
Parents: 4792eac
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Fri Jan 8 16:03:26 2016 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Fri Jan 8 16:03:26 2016 -0500

----------------------------------------------------------------------
 .../gfac/core/cluster/JobSubmissionOutput.java  |  18 +++
 .../gfac/core/cluster/OutputParser.java         |   8 ++
 .../airavata/gfac/impl/HPCRemoteCluster.java    |   6 +
 .../gfac/impl/job/ForkOutputParser.java         |   5 +
 .../airavata/gfac/impl/job/LSFOutputParser.java |   5 +
 .../airavata/gfac/impl/job/PBSOutputParser.java |   5 +
 .../gfac/impl/job/SlurmOutputParser.java        |   7 ++
 .../airavata/gfac/impl/job/UGEOutputParser.java |   5 +
 .../impl/task/DefaultJobSubmissionTask.java     | 112 +++++++++++--------
 9 files changed, 123 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobSubmissionOutput.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobSubmissionOutput.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobSubmissionOutput.java
index 6632ab5..d912409 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobSubmissionOutput.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobSubmissionOutput.java
@@ -27,6 +27,8 @@ public class JobSubmissionOutput {
 	private String stdErr;
 	private String command;
 	private String jobId;
+	private boolean isJobSubmissionFailed;
+	private String failureReason;
 
 	public int getExitCode() {
 		return exitCode;
@@ -67,4 +69,20 @@ public class JobSubmissionOutput {
 	public void setJobId(String jobId) {
 		this.jobId = jobId;
 	}
+
+	public boolean isJobSubmissionFailed() {
+		return isJobSubmissionFailed;
+	}
+
+	public void setJobSubmissionFailed(boolean jobSubmissionFailed) {
+		isJobSubmissionFailed = jobSubmissionFailed;
+	}
+
+	public String getFailureReason() {
+		return failureReason;
+	}
+
+	public void setFailureReason(String failureReason) {
+		this.failureReason = failureReason;
+	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/OutputParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/OutputParser.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/OutputParser.java
index 521e23f..18de355 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/OutputParser.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/OutputParser.java
@@ -44,6 +44,14 @@ public interface OutputParser {
 
 
     /**
+     * Parse output return by job submission task and identify jobSubmission failures.
+     * @param rawOutput
+     * @return true if job submission has been failed, false otherwise.
+     */
+    public boolean isJobSubmissionFailed(String rawOutput);
+
+
+    /**
      * This can be used to get the job status from the output
      * @param jobID
      * @param rawOutput

http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
index 8c4a4c0..022c8bc 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
@@ -91,6 +91,12 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
 		executeCommand(submitCommand, reader);
 //		throwExceptionOnError(reader, submitCommand);
 		jsoutput.setJobId(outputParser.parseJobSubmission(reader.getStdOutputString()));
+		if (jsoutput.getJobId() == null) {
+			if (outputParser.isJobSubmissionFailed(reader.getStdOutputString())) {
+				jsoutput.setJobSubmissionFailed(true);
+				jsoutput.setFailureReason(reader.getStdOutputString());
+			}
+		}
 		jsoutput.setExitCode(reader.getExitCode());
 		jsoutput.setStdOut(reader.getStdOutputString());
 		jsoutput.setStdErr(reader.getStdErrorString());

http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/ForkOutputParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/ForkOutputParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/ForkOutputParser.java
index 72856e5..b99db30 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/ForkOutputParser.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/ForkOutputParser.java
@@ -44,6 +44,11 @@ public class ForkOutputParser implements OutputParser {
     }
 
     @Override
+    public boolean isJobSubmissionFailed(String rawOutput) {
+        return false;
+    }
+
+    @Override
     public JobStatus parseJobStatus(String jobID, String rawOutput) throws SSHApiException {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/LSFOutputParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/LSFOutputParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/LSFOutputParser.java
index b16aa9b..bb0ae46 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/LSFOutputParser.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/LSFOutputParser.java
@@ -52,6 +52,11 @@ public class LSFOutputParser implements OutputParser {
     }
 
     @Override
+    public boolean isJobSubmissionFailed(String rawOutput) {
+        return false;
+    }
+
+    @Override
     public JobStatus parseJobStatus(String jobID, String rawOutput) throws SSHApiException {
         boolean jobFount = false;
         logger.debug(rawOutput);

http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/PBSOutputParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/PBSOutputParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/PBSOutputParser.java
index f187724..7f97a68 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/PBSOutputParser.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/PBSOutputParser.java
@@ -135,6 +135,11 @@ public class PBSOutputParser implements OutputParser {
         return jobId;  //In PBS stdout is going to be directly the jobID
     }
 
+    @Override
+    public boolean isJobSubmissionFailed(String rawOutput) {
+        return false;
+    }
+
     public JobStatus parseJobStatus(String jobID, String rawOutput) {
         boolean jobFount = false;
         log.debug(rawOutput);

http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/SlurmOutputParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/SlurmOutputParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/SlurmOutputParser.java
index fecb5e7..dff0a9b 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/SlurmOutputParser.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/SlurmOutputParser.java
@@ -115,6 +115,13 @@ public class SlurmOutputParser implements OutputParser {
 	    return "";
     }
 
+    @Override
+    public boolean isJobSubmissionFailed(String rawOutput) {
+        Pattern pattern = Pattern.compile("FAILED");
+        Matcher matcher = pattern.matcher(rawOutput);
+        return matcher.find();
+    }
+
     public JobStatus parseJobStatus(String jobID, String rawOutput) throws SSHApiException {
         log.info(rawOutput);
         Pattern pattern = Pattern.compile(jobID + "(?=\\s+\\S+\\s+\\S+\\s+\\S+\\s+(?<" + STATUS + ">\\w+))");

http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/UGEOutputParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/UGEOutputParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/UGEOutputParser.java
index 0ece2d9..f19d4f7 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/UGEOutputParser.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/UGEOutputParser.java
@@ -135,6 +135,11 @@ public class UGEOutputParser implements OutputParser {
 		}
 	}
 
+    @Override
+    public boolean isJobSubmissionFailed(String rawOutput) {
+        return false;
+    }
+
     public JobStatus parseJobStatus(String jobID, String rawOutput) {
         Pattern pattern = Pattern.compile("job_number:[\\s]+" + jobID);
         Matcher matcher = pattern.matcher(rawOutput);

http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
index 68d3bac..ebdda13 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
@@ -82,40 +82,56 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
 				jobModel.setStdErr(jobSubmissionOutput.getStdErr());
 				jobModel.setStdOut(jobSubmissionOutput.getStdOut());
 				String jobId = jobSubmissionOutput.getJobId();
-				if (exitCode != 0 && jobId == null) {
+				if (exitCode != 0 || jobSubmissionOutput.isJobSubmissionFailed()) {
 					jobModel.setJobId(DEFAULT_JOB_ID);
-					GFacUtils.saveJobModel(processContext, jobModel);
-					String msg;
-					if (exitCode != Integer.MIN_VALUE) {
-						msg = "expId:" + processContext.getProcessModel().getExperimentId() + ", processId:" +
-								processContext.getProcessId() + ", taskId: " + taskContext.getTaskId() +
-								" return non zero exit code:" + exitCode + "  for JobName:" + jobModel.getJobName() +
-								", Hence changing job state to Failed";
+					if (jobSubmissionOutput.isJobSubmissionFailed()) {
+						jobModel.setJobStatus(new JobStatus(JobState.FAILED));
+						jobModel.getJobStatus().setReason(jobSubmissionOutput.getFailureReason());
+						log.error("expId: {}, processid: {}, taskId: {} :- Job submission failed for job name {}",
+								taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(), jobModel.getJobName());
+						ErrorModel errorModel = new ErrorModel();
+						errorModel.setUserFriendlyMessage(jobSubmissionOutput.getFailureReason());
+						errorModel.setActualErrorMessage(jobSubmissionOutput.getFailureReason());
+						GFacUtils.saveExperimentError(processContext, errorModel);
+						GFacUtils.saveProcessError(processContext, errorModel);
+						GFacUtils.saveTaskError(taskContext, errorModel);
+						taskStatus.setState(TaskState.FAILED);
+						taskStatus.setReason("Job submission command exit with non zero exit code");
+						taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+						taskContext.setTaskStatus(taskStatus);
 					} else {
-						msg = "expId:" + processContext.getProcessModel().getExperimentId() + ", processId:" +
-								processContext.getProcessId() + ", taskId: " + taskContext.getTaskId() +
-								" doesn't  return valid job submission exit code for JobName:" + jobModel.getJobName() +
-								", Hence changing job state to Failed";
+						String msg;
+						GFacUtils.saveJobModel(processContext, jobModel);
+						if (exitCode != Integer.MIN_VALUE) {
+							msg = "expId:" + processContext.getProcessModel().getExperimentId() + ", processId:" +
+									processContext.getProcessId() + ", taskId: " + taskContext.getTaskId() +
+									" return non zero exit code:" + exitCode + "  for JobName:" + jobModel.getJobName() +
+									", Hence changing job state to Failed";
+						} else {
+							msg = "expId:" + processContext.getProcessModel().getExperimentId() + ", processId:" +
+									processContext.getProcessId() + ", taskId: " + taskContext.getTaskId() +
+									" doesn't  return valid job submission exit code for JobName:" + jobModel.getJobName() +
+									", Hence changing job state to Failed";
+						}
+						log.error(msg);
+						ErrorModel errorModel = new ErrorModel();
+						errorModel.setUserFriendlyMessage(msg);
+						errorModel.setActualErrorMessage(msg);
+						GFacUtils.saveExperimentError(processContext, errorModel);
+						GFacUtils.saveProcessError(processContext, errorModel);
+						GFacUtils.saveTaskError(taskContext, errorModel);
+						taskStatus.setState(TaskState.FAILED);
+						taskStatus.setReason("Job submission command exit with non zero exit code");
+						taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+						taskContext.setTaskStatus(taskStatus);
 					}
-					log.error(msg);
-					ErrorModel errorModel = new ErrorModel();
-					errorModel.setUserFriendlyMessage(msg);
-					errorModel.setActualErrorMessage(msg);
-					GFacUtils.saveExperimentError(processContext, errorModel);
-					GFacUtils.saveProcessError(processContext, errorModel);
-					GFacUtils.saveTaskError(taskContext, errorModel);
-					taskStatus.setState(TaskState.FAILED);
-					taskStatus.setReason("Job submission command exit with non zero exit code");
-					taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-					taskContext.setTaskStatus(taskStatus);
 					try {
 						GFacUtils.saveAndPublishTaskStatus(taskContext);
 					} catch (GFacException e) {
 						log.error("Error while saving task status", e);
 					}
 					return taskStatus;
-				}
-			    if (jobId != null && !jobId.isEmpty()) {
+				} else if (jobId != null && !jobId.isEmpty()) {
 				    jobModel.setJobId(jobId);
 				    GFacUtils.saveJobModel(processContext, jobModel);
 				    jobStatus.setJobState(JobState.SUBMITTED);
@@ -134,29 +150,29 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
 				    taskStatus = new TaskStatus(TaskState.COMPLETED);
 				    taskStatus.setReason("Submitted job to compute resource");
                     taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-			    } else {
-				    int verificationTryCount = 0;
-				    while (verificationTryCount++ < 3) {
-					    String verifyJobId = verifyJobSubmission(remoteCluster, jobModel);
-					    if (verifyJobId != null && !verifyJobId.isEmpty()) {
-						    // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
-						    jobId = verifyJobId;
-						    jobModel.setJobId(jobId);
-						    GFacUtils.saveJobModel(processContext,jobModel);
-						    jobStatus.setJobState(JobState.QUEUED);
-						    jobStatus.setReason("Verification step succeeded");
-                            jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-						    jobModel.setJobStatus(jobStatus);
-						    GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
-						    taskStatus.setState(TaskState.COMPLETED);
-						    taskStatus.setReason("Submitted job to compute resource");
-                            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-						    break;
-					    }
-					    log.info("Verify step return invalid jobId, retry verification step in {} secs", verificationTryCount * 10);
-					    Thread.sleep(verificationTryCount * 10000);
-				    }
-			    }
+				} else {
+					int verificationTryCount = 0;
+					while (verificationTryCount++ < 3) {
+						String verifyJobId = verifyJobSubmission(remoteCluster, jobModel);
+						if (verifyJobId != null && !verifyJobId.isEmpty()) {
+							// JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
+							jobId = verifyJobId;
+							jobModel.setJobId(jobId);
+							GFacUtils.saveJobModel(processContext, jobModel);
+							jobStatus.setJobState(JobState.QUEUED);
+							jobStatus.setReason("Verification step succeeded");
+							jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+							jobModel.setJobStatus(jobStatus);
+							GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+							taskStatus.setState(TaskState.COMPLETED);
+							taskStatus.setReason("Submitted job to compute resource");
+							taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+							break;
+						}
+						log.info("Verify step return invalid jobId, retry verification step in {} secs", verificationTryCount * 10);
+						Thread.sleep(verificationTryCount * 10000);
+					}
+				}
 
 			    if (jobId == null || jobId.isEmpty()) {
 					jobModel.setJobId(DEFAULT_JOB_ID);