You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/04/06 23:46:49 UTC
airavata git commit: Make email notification type to ALL in SLURM and
abe in PBS xslt files, improved email parsers.
Repository: airavata
Updated Branches:
refs/heads/emailMonitoring 9a34ebc10 -> cd6f1d980
Make email notification type to ALL in SLURM and abe in PBS xslt files, improved email parsers.
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/cd6f1d98
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/cd6f1d98
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/cd6f1d98
Branch: refs/heads/emailMonitoring
Commit: cd6f1d9801fd66c9c877e96b5d5764540983c295
Parents: 9a34ebc
Author: shamrath <sh...@gmail.com>
Authored: Mon Apr 6 17:47:34 2015 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Mon Apr 6 17:47:34 2015 -0400
----------------------------------------------------------------------
.../server/src/main/resources/PBSTemplate.xslt | 7 +--
.../src/main/resources/SLURMTemplate.xslt | 9 +---
.../core/monitor/mail/EmailBasedMonitor.java | 49 ++++++++++++++++----
.../monitor/mail/parser/PBSEmailParser.java | 28 +++++++++--
.../monitor/mail/parser/SLURMEmailParser.java | 2 +-
.../gfac/ssh/provider/impl/SSHProvider.java | 3 +-
.../airavata/gfac/ssh/util/GFACSSHUtils.java | 45 ++++++++++--------
7 files changed, 97 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/cd6f1d98/modules/configuration/server/src/main/resources/PBSTemplate.xslt
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/PBSTemplate.xslt b/modules/configuration/server/src/main/resources/PBSTemplate.xslt
index 033534e..559c30e 100644
--- a/modules/configuration/server/src/main/resources/PBSTemplate.xslt
+++ b/modules/configuration/server/src/main/resources/PBSTemplate.xslt
@@ -25,12 +25,7 @@
#PBS -N <xsl:value-of select="ns:jobName"/>
</xsl:when>
</xsl:choose>
- <xsl:choose>
- <xsl:when test="ns:mailOptions != ''">
-#PBS -m <xsl:value-of select="ns:mailOptions"/>
- </xsl:when>
- </xsl:choose>
- <xsl:choose>
+#PBS -m abe <xsl:choose>
<xsl:when test="ns:mailAddress != ''">
#PBS -M <xsl:value-of select="ns:mailAddress"/>
</xsl:when>
http://git-wip-us.apache.org/repos/asf/airavata/blob/cd6f1d98/modules/configuration/server/src/main/resources/SLURMTemplate.xslt
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/SLURMTemplate.xslt b/modules/configuration/server/src/main/resources/SLURMTemplate.xslt
index ecafbc2..a2e274d 100644
--- a/modules/configuration/server/src/main/resources/SLURMTemplate.xslt
+++ b/modules/configuration/server/src/main/resources/SLURMTemplate.xslt
@@ -31,15 +31,10 @@
</xsl:choose>
<xsl:choose>
<xsl:when test="ns:mailAddress != ''">
-#SBATCH -mail-user=<xsl:value-of select="ns:mailAddress"/>
+#SBATCH --mail-user=<xsl:value-of select="ns:mailAddress"/>
</xsl:when>
</xsl:choose>
- <xsl:choose>
- <xsl:when test="ns:mailType != ''">
-#SBATCH -mail-type=<xsl:value-of select="ns:mailType"/>
- </xsl:when>
- </xsl:choose>
- <xsl:choose>
+#SBATCH --mail-type=ALL <xsl:choose>
<xsl:when test="ns:acountString != ''">
#SBATCH -A <xsl:value-of select="ns:acountString"/>
</xsl:when>
http://git-wip-us.apache.org/repos/asf/airavata/blob/cd6f1d98/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/EmailBasedMonitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/EmailBasedMonitor.java
index f798b2f..8adb860 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/EmailBasedMonitor.java
@@ -22,6 +22,8 @@ package org.apache.airavata.gfac.core.monitor.mail;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.logger.AiravataLogger;
+import org.apache.airavata.common.logger.AiravataLoggerFactory;
import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
@@ -30,9 +32,10 @@ import org.apache.airavata.gfac.core.utils.OutHandlerWorker;
import org.apache.airavata.gfac.core.monitor.mail.parser.EmailParser;
import org.apache.airavata.gfac.core.monitor.mail.parser.PBSEmailParser;
import org.apache.airavata.gfac.core.monitor.mail.parser.SLURMEmailParser;
+import org.apache.airavata.model.messaging.event.JobIdentifier;
+import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
+import org.apache.airavata.model.workspace.experiment.JobState;
import org.apache.airavata.model.workspace.experiment.JobStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.mail.Address;
import javax.mail.Flags;
@@ -48,7 +51,7 @@ import java.util.concurrent.ConcurrentHashMap;
public class EmailBasedMonitor implements Runnable{
- private static final Logger log = LoggerFactory.getLogger(EmailBasedMonitor.class);
+ private static final AiravataLogger log = AiravataLoggerFactory.getLogger(EmailBasedMonitor.class);
private static final String PBS_CONSULT_SDSC_EDU = "pbsconsult@sdsc.edu";
private static final String SLURM_BATCH_STAMPEDE = "slurm@batch1.stampede.tacc.utexas.edu";
@@ -136,9 +139,9 @@ public class EmailBasedMonitor implements Runnable{
for (Message message : searchMessages) {
try {
JobStatusResult jobStatusResult = parse(message);
- updateJobStatus(jobStatusResult);
+ process(jobStatusResult);
} catch (AiravataException e) {
- log.error("Error parsing email message =====================================>");
+ log.error("Error parsing email message =====================================>", e);
try {
writeEnvelopeOnError(message);
} catch (MessagingException e1) {
@@ -164,13 +167,41 @@ public class EmailBasedMonitor implements Runnable{
}
}
- private void updateJobStatus(JobStatusResult jobStatusResult) throws AiravataException {
- JobExecutionContext jEC = jobMonitorMap.remove(jobStatusResult.getJobId());
+ private void process(JobStatusResult jobStatusResult) throws AiravataException {
+ JobExecutionContext jEC = jobMonitorMap.get(jobStatusResult.getJobId());
if (jEC == null) {
throw new AiravataException("JobExecutionContext is not found for job Id " + jobStatusResult.getJobId());
}
- jEC.getJobDetails().setJobStatus(new JobStatus(jobStatusResult.getState()));
- GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(jEC, monitorPublisher));
+ JobState resultState = jobStatusResult.getState();
+ jEC.getJobDetails().setJobStatus(new JobStatus(resultState));
+ if (resultState == JobState.COMPLETE) {
+ GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(jEC, monitorPublisher));
+ }else if (resultState == JobState.QUEUED) {
+ // TODO - publish queued rabbitmq message
+ }else if (resultState == JobState.FAILED) {
+ // TODO - handle failed scenario
+ jobMonitorMap.remove(jobStatusResult.getJobId());
+ log.info("Job failed email received , removed job from job monitoring");
+// monitorPublisher.publish(jEC.getJobDetails().getJobStatus());
+ }
+ publishJobStatusChange(jEC);
+ }
+
+ private void publishJobStatusChange(JobExecutionContext jobExecutionContext) {
+ JobStatusChangeRequestEvent jobStatus = new JobStatusChangeRequestEvent();
+ JobIdentifier jobIdentity = new JobIdentifier(jobExecutionContext.getJobDetails().getJobID(),
+ jobExecutionContext.getTaskData().getTaskID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getGatewayID());
+ jobStatus.setJobIdentity(jobIdentity);
+ jobStatus.setState(jobExecutionContext.getJobDetails().getJobStatus().getJobState());
+ // we have this JobStatus class to handle amqp monitoring
+ log.debugId(jobStatus.getJobIdentity().getJobId(), "Published job status change request, " +
+ "experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(),
+ jobStatus.getJobIdentity().getTaskId());
+
+ monitorPublisher.publish(jobStatus);
}
private void writeEnvelopeOnError(Message m) throws MessagingException {
http://git-wip-us.apache.org/repos/asf/airavata/blob/cd6f1d98/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/parser/PBSEmailParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/parser/PBSEmailParser.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/parser/PBSEmailParser.java
index 64c78c1..37d2317 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/parser/PBSEmailParser.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/parser/PBSEmailParser.java
@@ -38,8 +38,10 @@ public class PBSEmailParser implements EmailParser {
private static final String STATUS = "status";
private static final String JOBID = "jobId";
+ private static final String EXIT_STATUS = "exitStatus";
private static final String REGEX = "[a-zA-Z: ]*(?<" + JOBID + ">[a-zA-Z0-9-\\.]*)\\s+.*\\s+.*\\s+(?<"
+ STATUS + ">[a-zA-Z\\ ]*)";
+ private static final String REGEX_EXIT_STATUS = "Exit_status=(?<" + EXIT_STATUS + ">[\\d]+)";
@Override
public JobStatusResult parseEmail(Message message) throws MessagingException, AiravataException {
@@ -51,7 +53,7 @@ public class PBSEmailParser implements EmailParser {
if (matcher.find()) {
jobStatusResult.setJobId(matcher.group(JOBID));
String statusLine = matcher.group(STATUS);
- jobStatusResult.setState(getJobState(statusLine));
+ jobStatusResult.setState(getJobState(statusLine, content));
return jobStatusResult;
} else {
log.error("No matched found for content => \n" + content);
@@ -63,17 +65,37 @@ public class PBSEmailParser implements EmailParser {
return null;
}
- private JobState getJobState(String statusLine) {
+ private JobState getJobState(String statusLine, String content) {
switch (statusLine) {
case "Begun execution":
return JobState.QUEUED;
case "Execution terminated":
- return JobState.COMPLETE;
+ int exitStatus = getExitStatus(content);
+ switch (exitStatus) {
+ case 0:
+ return JobState.COMPLETE;
+ case 1:
+ return JobState.FAILED;
+ default:
+ return JobState.UNKNOWN;
+ }
default:
return JobState.UNKNOWN;
}
}
+ private int getExitStatus(String content) {
+ Pattern pattern = Pattern.compile(REGEX_EXIT_STATUS);
+ Matcher matcher = pattern.matcher(content);
+ if (matcher.find()) {
+ String group = matcher.group(EXIT_STATUS);
+ if (group != null && !group.trim().isEmpty()) {
+ return Integer.valueOf(group.trim());
+ }
+ }
+ return -1;
+ }
+
/* -----------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/cd6f1d98/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/parser/SLURMEmailParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/parser/SLURMEmailParser.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/parser/SLURMEmailParser.java
index 8c6dbed..e5af721 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/parser/SLURMEmailParser.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/parser/SLURMEmailParser.java
@@ -61,7 +61,7 @@ public class SLURMEmailParser implements EmailParser {
}
private JobState getJobState(String state) {
- switch (state) {
+ switch (state.trim()) {
case "Began":
return JobState.QUEUED;
case "Ended":
http://git-wip-us.apache.org/repos/asf/airavata/blob/cd6f1d98/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index f628b4e..0012ce1 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -385,7 +385,8 @@ public class SSHProvider extends AbstractProvider {
", execution is configured as asynchronous, so Outhandler will not be invoked");
}*/
try {
- EmailBasedMonitor.getInstant(((MonitorPublisher) jobExecutionContext.getProperty("MonitorPubliser")));
+ EmailBasedMonitor emailBasedMonitor = EmailBasedMonitor.getInstant(((MonitorPublisher) jobExecutionContext.getProperty("MonitorPublisher")));
+ emailBasedMonitor.addToJobMonitorMap(jobExecutionContext);
} catch (ApplicationSettingsException e) {
throw new GFacHandlerException("Error while delegating job execution context to email based monitor");
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cd6f1d98/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
index 588183e..9628585 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
@@ -290,25 +290,32 @@ public class GFACSSHUtils {
}
}
try {
- if(ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_ENABLE).equalsIgnoreCase("true")){
- jobDescriptor.setMailOptions(ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_FLAGS));
- String emailids = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_EMAILIDS);
-
- if(taskData.isEnableEmailNotification()){
- List<String> emailList = jobExecutionContext.getTaskData().getEmailAddresses();
- String elist = GFacUtils.listToCsv(emailList, ',');
- if(emailids != null && !emailids.isEmpty()){
- emailids = emailids +"," + elist;
- }else{
- emailids = elist;
- }
- }
- if(emailids != null && !emailids.isEmpty()){
- logger.info("Email list: "+ emailids);
- jobDescriptor.setMailAddress(emailids);
- }
- }
- } catch (ApplicationSettingsException e) {
+ if(ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_ENABLE).equalsIgnoreCase("true")) {
+ String flags = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_FLAGS);
+ if (flags != null && jobExecutionContext.getApplicationContext().getComputeResourceDescription().getHostName().equals("stampede.tacc.xsede.org")) {
+ flags = "ALL";
+ }
+ jobDescriptor.setMailOptions(flags);
+
+ String emailids = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_EMAILIDS);
+
+ if (taskData.isEnableEmailNotification()) {
+ List<String> emailList = jobExecutionContext.getTaskData().getEmailAddresses();
+ String elist = GFacUtils.listToCsv(emailList, ',');
+ if (elist != null && !elist.isEmpty()) {
+ if (emailids != null && !emailids.isEmpty()) {
+ emailids = emailids + "," + elist;
+ } else {
+ emailids = elist;
+ }
+ }
+ }
+ if (emailids != null && !emailids.isEmpty()) {
+ logger.info("Email list: " + emailids);
+ jobDescriptor.setMailAddress(emailids);
+ }
+ }
+ } catch (ApplicationSettingsException e) {
logger.error("ApplicationSettingsException : " +e.getLocalizedMessage());
}
// this is common for any application descriptor