You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/05/11 22:34:19 UTC
airavata git commit: verify the job in remote resource after submit,
if both failed then mark experiment as failed and exit.
Repository: airavata
Updated Branches:
refs/heads/master 4a978d4f1 -> badaa732f
verify the job in remote resource after submit, if both failed then mark experiment as failed and exit.
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/badaa732
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/badaa732
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/badaa732
Branch: refs/heads/master
Commit: badaa732ff527a0a4012fe824cb53db83954d24b
Parents: 4a978d4
Author: shamrath <sh...@gmail.com>
Authored: Mon May 11 16:34:15 2015 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Mon May 11 16:34:15 2015 -0400
----------------------------------------------------------------------
.../airavata/gfac/core/utils/GFacUtils.java | 25 +++++++++++++++
.../gfac/ssh/provider/impl/SSHProvider.java | 32 +++++++++++++++++---
.../apache/airavata/gsi/ssh/api/Cluster.java | 8 +++++
.../ssh/api/job/JobManagerConfiguration.java | 2 ++
.../gsi/ssh/api/job/LSFJobConfiguration.java | 5 +++
.../gsi/ssh/api/job/LSFOutputParser.java | 20 ++++++++++++
.../airavata/gsi/ssh/api/job/OutputParser.java | 9 ++++++
.../gsi/ssh/api/job/PBSJobConfiguration.java | 6 ++++
.../gsi/ssh/api/job/PBSOutputParser.java | 20 ++++++++++++
.../gsi/ssh/api/job/SGEOutputParser.java | 6 ++++
.../gsi/ssh/api/job/SlurmJobConfiguration.java | 5 +++
.../gsi/ssh/api/job/SlurmOutputParser.java | 20 ++++++++++++
.../gsi/ssh/impl/GSISSHAbstractCluster.java | 10 ++++++
13 files changed, 163 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 1e9e212..5bcd75c 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -41,9 +41,12 @@ import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
import org.apache.airavata.model.appcatalog.computeresource.*;
import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.ChildDataType;
import org.apache.airavata.registry.cpi.CompositeIdentifier;
import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.RegistryModelType;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -1465,4 +1468,26 @@ public class GFacUtils {
buffer.flip();//need flip
return buffer.getLong();
}
+
+ public static ExperimentState updateExperimentStatus(String experimentId, ExperimentState state) throws RegistryException {
+ Registry airavataRegistry = RegistryFactory.getDefaultRegistry();
+ Experiment details = (Experiment) airavataRegistry.get(RegistryModelType.EXPERIMENT, experimentId);
+ if (details == null) {
+ details = new Experiment();
+ details.setExperimentID(experimentId);
+ }
+ org.apache.airavata.model.workspace.experiment.ExperimentStatus status = new org.apache.airavata.model.workspace.experiment.ExperimentStatus();
+ status.setExperimentState(state);
+ status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+ if (!ExperimentState.CANCELED.equals(details.getExperimentStatus().getExperimentState()) &&
+ !ExperimentState.CANCELING.equals(details.getExperimentStatus().getExperimentState())) {
+ status.setExperimentState(state);
+ } else {
+ status.setExperimentState(details.getExperimentStatus().getExperimentState());
+ }
+ details.setExperimentStatus(status);
+ log.info("Updating the experiment status of experiment: " + experimentId + " to " + status.getExperimentState().toString());
+ airavataRegistry.update(RegistryModelType.EXPERIMENT_STATUS, status, experimentId);
+ return details.getExperimentStatus().getExperimentState();
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index 0f88327..2a3287b 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -56,6 +56,7 @@ import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerTy
import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.ExperimentState;
import org.apache.airavata.model.workspace.experiment.JobDetails;
import org.apache.airavata.model.workspace.experiment.JobState;
import org.apache.xmlbeans.XmlException;
@@ -167,14 +168,24 @@ public class SSHProvider extends AbstractProvider {
jobDetails.setJobDescription(jobDescriptor.toXML());
String jobID = cluster.submitBatchJob(jobDescriptor);
+ if (jobID != null) {
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
+ }
jobExecutionContext.setJobDetails(jobDetails);
+ String verifyJobId = verifyJobSubmission(cluster, jobDetails);
+ if (verifyJobId != null) {
+ // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.QUEUED);
+ if (jobID == null) {
+ jobID = verifyJobId;
+ }
+ }
if (jobID == null) {
- jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
- } else {
- jobDetails.setJobID(jobID);
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
+ log.error("Couldn't find remote jobId for JobName:" + jobDetails.getJobName() + ", ExperimentId:" + jobExecutionContext.getExperimentID());
+ GFacUtils.updateExperimentStatus(jobExecutionContext.getExperimentID(), ExperimentState.FAILED);
+ return;
}
+ jobDetails.setJobID(jobID);
data.append("jobDesc=").append(jobDescriptor.toXML());
data.append(",jobId=").append(jobDetails.getJobID());
delegateToMonitorHandlers(jobExecutionContext);
@@ -203,6 +214,17 @@ public class SSHProvider extends AbstractProvider {
}
}
+ private String verifyJobSubmission(Cluster cluster, JobDetails jobDetails) {
+ String jobName = jobDetails.getJobName();
+ String jobId = null;
+ try {
+ jobId = cluster.getJobIdByJobName(jobName, cluster.getServerInfo().getUserName());
+ } catch (SSHApiException e) {
+ log.error("Error while verifying JobId from JobName");
+ }
+ return jobId;
+ }
+
public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
index ed4b3b4..34e3b94 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
@@ -113,6 +113,14 @@ public interface Cluster {
* @throws SSHApiException throws exception during error
*/
public JobStatus getJobStatus(String jobID) throws SSHApiException;
+ /**
+ * This will get the job status of the the job associated with this jobId
+ *
+ * @param jobName jobName of the job user want to get the status
+ * @return jobId of the given jobName
+ * @throws SSHApiException throws exception during error
+ */
+ public String getJobIdByJobName(String jobName, String userName) throws SSHApiException;
/**
* This method can be used to poll the jobstatuses based on the given
http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobManagerConfiguration.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobManagerConfiguration.java
index 85a843e..d9b6b1c 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobManagerConfiguration.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobManagerConfiguration.java
@@ -32,6 +32,8 @@ public interface JobManagerConfiguration {
public RawCommandInfo getUserBasedMonitorCommand(String userName);
+ public RawCommandInfo getJobIdMonitorCommand(String jobName , String userName);
+
public String getScriptExtension();
public RawCommandInfo getSubmitCommand(String workingDirectory, String pbsFilePath);
http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/LSFJobConfiguration.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/LSFJobConfiguration.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/LSFJobConfiguration.java
index 46fe9ad..740c9ac 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/LSFJobConfiguration.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/LSFJobConfiguration.java
@@ -74,6 +74,11 @@ public class LSFJobConfiguration implements JobManagerConfiguration {
}
@Override
+ public RawCommandInfo getJobIdMonitorCommand(String jobName, String userName) {
+ return new RawCommandInfo(this.installedPath + "bjobs -J " + jobName);
+ }
+
+ @Override
public String getScriptExtension() {
return scriptExtension;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/LSFOutputParser.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/LSFOutputParser.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/LSFOutputParser.java
index 71c3339..a621ae0 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/LSFOutputParser.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/LSFOutputParser.java
@@ -29,6 +29,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
public class LSFOutputParser implements OutputParser {
private final static Logger logger = LoggerFactory.getLogger(LSFOutputParser.class);
@@ -90,6 +92,24 @@ public class LSFOutputParser implements OutputParser {
}
}
+ @Override
+ public String parseJobId(String jobName, String rawOutput) throws SSHApiException {
+ String regJobId = "jobId";
+ Pattern pattern = Pattern.compile("(?=(?<" + regJobId + ">\\d+)\\s+\\w+\\s+" + jobName + ")"); // regex - look ahead and match
+ if (rawOutput != null) {
+ Matcher matcher = pattern.matcher(rawOutput);
+ if (matcher.find()) {
+ return matcher.group(regJobId);
+ } else {
+ logger.error("No match is found for JobName");
+ return null;
+ }
+ } else {
+ logger.error("Error: RawOutput shouldn't be null");
+ return null;
+ }
+ }
+
public static void main(String[] args) {
String test = "Job <2477982> is submitted to queue <short>.";
System.out.println(test.substring(test.indexOf("<")+1, test.indexOf(">")));
http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/OutputParser.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/OutputParser.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/OutputParser.java
index b6f5f0a..fd37b6a 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/OutputParser.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/OutputParser.java
@@ -56,4 +56,13 @@ public interface OutputParser {
* @param rawOutput
*/
public void parseJobStatuses(String userName, Map<String, JobStatus> statusMap, String rawOutput)throws SSHApiException;
+
+ /**
+ * filter the jobId value of given JobName from rawOutput
+ * @param jobName
+ * @param rawOutput
+ * @return
+ * @throws SSHApiException
+ */
+ public String parseJobId(String jobName, String rawOutput) throws SSHApiException;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSJobConfiguration.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSJobConfiguration.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSJobConfiguration.java
index e935dfb..b658b16 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSJobConfiguration.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSJobConfiguration.java
@@ -97,6 +97,12 @@ public class PBSJobConfiguration implements JobManagerConfiguration {
}
@Override
+ public RawCommandInfo getJobIdMonitorCommand(String jobName, String userName) {
+ // For PBS there is no option to get jobDetails by JobName, so we search with userName
+ return new RawCommandInfo(this.installedPath + "qstat -u " + userName);
+ }
+
+ @Override
public String getBaseCancelCommand() {
return "qdel";
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSOutputParser.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSOutputParser.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSOutputParser.java
index 8f2a606..3304465 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSOutputParser.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSOutputParser.java
@@ -30,6 +30,8 @@ import javax.validation.constraints.Null;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
public class PBSOutputParser implements OutputParser {
private static final Logger log = LoggerFactory.getLogger(PBSOutputParser.class);
@@ -190,5 +192,23 @@ public class PBSOutputParser implements OutputParser {
}
}
+ @Override
+ public String parseJobId(String jobName, String rawOutput) throws SSHApiException {
+ String regJobId = "jobId";
+ Pattern pattern = Pattern.compile("(?=(?<" + regJobId + ">\\d+)\\s+\\w+\\s+" + jobName + ")"); // regex - look ahead and match
+ if (rawOutput != null) {
+ Matcher matcher = pattern.matcher(rawOutput);
+ if (matcher.find()) {
+ return matcher.group(regJobId);
+ } else {
+ log.error("No match is found for JobName");
+ return null;
+ }
+ } else {
+ log.error("Error: RawOutput shouldn't be null");
+ return null;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SGEOutputParser.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SGEOutputParser.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SGEOutputParser.java
index 3fb5874..884b7f3 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SGEOutputParser.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SGEOutputParser.java
@@ -20,6 +20,7 @@
*/
package org.apache.airavata.gsi.ssh.api.job;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
import org.apache.airavata.gsi.ssh.impl.JobStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -163,5 +164,10 @@ public class SGEOutputParser implements OutputParser{
}
}
+ @Override
+ public String parseJobId(String jobName, String rawOutput) throws SSHApiException {
+ return null; // TODO: Implement the parse logic ( with regex if possible ).
+ }
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmJobConfiguration.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmJobConfiguration.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmJobConfiguration.java
index 807ac42..a74b35d 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmJobConfiguration.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmJobConfiguration.java
@@ -96,6 +96,11 @@ public class SlurmJobConfiguration implements JobManagerConfiguration{
}
@Override
+ public RawCommandInfo getJobIdMonitorCommand(String jobName, String userName) {
+ return new RawCommandInfo(this.installedPath + "squeue -n " + jobName);
+ }
+
+ @Override
public String getBaseCancelCommand() {
return "scancel";
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java
index 44a1068..64691dc 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java
@@ -29,6 +29,8 @@ import javax.print.attribute.standard.JobState;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
public class SlurmOutputParser implements OutputParser {
private static final Logger log = LoggerFactory.getLogger(SlurmOutputParser.class);
@@ -187,4 +189,22 @@ public class SlurmOutputParser implements OutputParser {
}
}
}
+
+ @Override
+ public String parseJobId(String jobName, String rawOutput) throws SSHApiException {
+ String regJobId = "jobId";
+ Pattern pattern = Pattern.compile("(?=(?<" + regJobId + ">\\d+)\\s+\\w+\\s+" + jobName + ")"); // regex - look ahead and match
+ if (rawOutput != null) {
+ Matcher matcher = pattern.matcher(rawOutput);
+ if (matcher.find()) {
+ return matcher.group(regJobId);
+ } else {
+ log.error("No match is found for JobName");
+ return null;
+ }
+ } else {
+ log.error("Error: RawOutput shouldn't be null");
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
index 3d18013..9357706 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
@@ -425,6 +425,16 @@ public class GSISSHAbstractCluster implements Cluster {
return jobManagerConfiguration.getParser().parseJobStatus(jobID, result);
}
+ @Override
+ public String getJobIdByJobName(String jobName, String userName) throws SSHApiException {
+ RawCommandInfo rawCommandInfo = jobManagerConfiguration.getJobIdMonitorCommand(jobName, userName);
+ StandardOutReader stdOutReader = new StandardOutReader();
+ CommandExecutor.executeCommand(rawCommandInfo, this.getSession(), stdOutReader);
+ String result = getOutputifAvailable(stdOutReader, "Error getting job information from the resource !",
+ jobManagerConfiguration.getJobIdMonitorCommand(jobName,userName).getCommand());
+ return jobManagerConfiguration.getParser().parseJobId(jobName, result);
+ }
+
private static void logDebug(String message) {
if (log.isDebugEnabled()) {
log.debug(message);