You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/04/06 23:46:49 UTC

airavata git commit: Make email notification type to ALL in SLURM and abe in PBS xslt files, improved email parsers.

Repository: airavata
Updated Branches:
  refs/heads/emailMonitoring 9a34ebc10 -> cd6f1d980


Make email notification type to ALL  in SLURM and abe in PBS xslt files, improved email parsers.


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/cd6f1d98
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/cd6f1d98
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/cd6f1d98

Branch: refs/heads/emailMonitoring
Commit: cd6f1d9801fd66c9c877e96b5d5764540983c295
Parents: 9a34ebc
Author: shamrath <sh...@gmail.com>
Authored: Mon Apr 6 17:47:34 2015 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Mon Apr 6 17:47:34 2015 -0400

----------------------------------------------------------------------
 .../server/src/main/resources/PBSTemplate.xslt  |  7 +--
 .../src/main/resources/SLURMTemplate.xslt       |  9 +---
 .../core/monitor/mail/EmailBasedMonitor.java    | 49 ++++++++++++++++----
 .../monitor/mail/parser/PBSEmailParser.java     | 28 +++++++++--
 .../monitor/mail/parser/SLURMEmailParser.java   |  2 +-
 .../gfac/ssh/provider/impl/SSHProvider.java     |  3 +-
 .../airavata/gfac/ssh/util/GFACSSHUtils.java    | 45 ++++++++++--------
 7 files changed, 97 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/cd6f1d98/modules/configuration/server/src/main/resources/PBSTemplate.xslt
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/PBSTemplate.xslt b/modules/configuration/server/src/main/resources/PBSTemplate.xslt
index 033534e..559c30e 100644
--- a/modules/configuration/server/src/main/resources/PBSTemplate.xslt
+++ b/modules/configuration/server/src/main/resources/PBSTemplate.xslt
@@ -25,12 +25,7 @@
 #PBS -N <xsl:value-of select="ns:jobName"/>
         </xsl:when>
     </xsl:choose>
-    <xsl:choose>
-    <xsl:when test="ns:mailOptions != ''">
-#PBS -m <xsl:value-of select="ns:mailOptions"/>
-    </xsl:when>
-    </xsl:choose>
-    <xsl:choose>
+#PBS -m abe   <xsl:choose>
     <xsl:when test="ns:mailAddress != ''">
 #PBS -M <xsl:value-of select="ns:mailAddress"/>
     </xsl:when>

http://git-wip-us.apache.org/repos/asf/airavata/blob/cd6f1d98/modules/configuration/server/src/main/resources/SLURMTemplate.xslt
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/SLURMTemplate.xslt b/modules/configuration/server/src/main/resources/SLURMTemplate.xslt
index ecafbc2..a2e274d 100644
--- a/modules/configuration/server/src/main/resources/SLURMTemplate.xslt
+++ b/modules/configuration/server/src/main/resources/SLURMTemplate.xslt
@@ -31,15 +31,10 @@
         </xsl:choose>
     <xsl:choose>
     <xsl:when test="ns:mailAddress != ''">
-#SBATCH -mail-user=<xsl:value-of select="ns:mailAddress"/>
+#SBATCH --mail-user=<xsl:value-of select="ns:mailAddress"/>
     </xsl:when>
     </xsl:choose>
-    <xsl:choose>
-    <xsl:when test="ns:mailType != ''">
-#SBATCH -mail-type=<xsl:value-of select="ns:mailType"/>
-    </xsl:when>
-    </xsl:choose>
-    <xsl:choose>
+#SBATCH --mail-type=ALL   <xsl:choose>
 <xsl:when test="ns:acountString != ''">
 #SBATCH -A <xsl:value-of select="ns:acountString"/>
     </xsl:when>

http://git-wip-us.apache.org/repos/asf/airavata/blob/cd6f1d98/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/EmailBasedMonitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/EmailBasedMonitor.java
index f798b2f..8adb860 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/EmailBasedMonitor.java
@@ -22,6 +22,8 @@ package org.apache.airavata.gfac.core.monitor.mail;
 
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.logger.AiravataLogger;
+import org.apache.airavata.common.logger.AiravataLoggerFactory;
 import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
@@ -30,9 +32,10 @@ import org.apache.airavata.gfac.core.utils.OutHandlerWorker;
 import org.apache.airavata.gfac.core.monitor.mail.parser.EmailParser;
 import org.apache.airavata.gfac.core.monitor.mail.parser.PBSEmailParser;
 import org.apache.airavata.gfac.core.monitor.mail.parser.SLURMEmailParser;
+import org.apache.airavata.model.messaging.event.JobIdentifier;
+import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
+import org.apache.airavata.model.workspace.experiment.JobState;
 import org.apache.airavata.model.workspace.experiment.JobStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.mail.Address;
 import javax.mail.Flags;
@@ -48,7 +51,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 public class EmailBasedMonitor implements Runnable{
 
-    private static final Logger log = LoggerFactory.getLogger(EmailBasedMonitor.class);
+    private static final AiravataLogger log = AiravataLoggerFactory.getLogger(EmailBasedMonitor.class);
 
     private static final String PBS_CONSULT_SDSC_EDU = "pbsconsult@sdsc.edu";
     private static final String SLURM_BATCH_STAMPEDE = "slurm@batch1.stampede.tacc.utexas.edu";
@@ -136,9 +139,9 @@ public class EmailBasedMonitor implements Runnable{
                 for (Message message : searchMessages) {
                     try {
                         JobStatusResult jobStatusResult = parse(message);
-                        updateJobStatus(jobStatusResult);
+                        process(jobStatusResult);
                     } catch (AiravataException e) {
-                        log.error("Error parsing email message =====================================>");
+                        log.error("Error parsing email message =====================================>", e);
                         try {
                             writeEnvelopeOnError(message);
                         } catch (MessagingException e1) {
@@ -164,13 +167,41 @@ public class EmailBasedMonitor implements Runnable{
         }
     }
 
-    private void updateJobStatus(JobStatusResult jobStatusResult) throws AiravataException {
-        JobExecutionContext jEC = jobMonitorMap.remove(jobStatusResult.getJobId());
+    private void process(JobStatusResult jobStatusResult) throws AiravataException {
+        JobExecutionContext jEC = jobMonitorMap.get(jobStatusResult.getJobId());
         if (jEC == null) {
             throw new AiravataException("JobExecutionContext is not found for job Id " + jobStatusResult.getJobId());
         }
-        jEC.getJobDetails().setJobStatus(new JobStatus(jobStatusResult.getState()));
-        GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(jEC, monitorPublisher));
+        JobState resultState = jobStatusResult.getState();
+        jEC.getJobDetails().setJobStatus(new JobStatus(resultState));
+        if (resultState == JobState.COMPLETE) {
+            GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(jEC, monitorPublisher));
+        }else if (resultState == JobState.QUEUED) {
+            // TODO - publish queued rabbitmq message
+        }else if (resultState == JobState.FAILED) {
+            // TODO - handle failed scenario
+            jobMonitorMap.remove(jobStatusResult.getJobId());
+            log.info("Job failed email received , removed job from job monitoring");
+//            monitorPublisher.publish(jEC.getJobDetails().getJobStatus());
+        }
+        publishJobStatusChange(jEC);
+    }
+
+    private void publishJobStatusChange(JobExecutionContext jobExecutionContext) {
+        JobStatusChangeRequestEvent jobStatus = new JobStatusChangeRequestEvent();
+        JobIdentifier jobIdentity = new JobIdentifier(jobExecutionContext.getJobDetails().getJobID(),
+                jobExecutionContext.getTaskData().getTaskID(),
+                jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                jobExecutionContext.getExperimentID(),
+                jobExecutionContext.getGatewayID());
+        jobStatus.setJobIdentity(jobIdentity);
+        jobStatus.setState(jobExecutionContext.getJobDetails().getJobStatus().getJobState());
+        // we have this JobStatus class to handle amqp monitoring
+        log.debugId(jobStatus.getJobIdentity().getJobId(), "Published job status change request, " +
+                        "experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(),
+                jobStatus.getJobIdentity().getTaskId());
+
+        monitorPublisher.publish(jobStatus);
     }
 
     private void writeEnvelopeOnError(Message m) throws MessagingException {

http://git-wip-us.apache.org/repos/asf/airavata/blob/cd6f1d98/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/parser/PBSEmailParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/parser/PBSEmailParser.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/parser/PBSEmailParser.java
index 64c78c1..37d2317 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/parser/PBSEmailParser.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/parser/PBSEmailParser.java
@@ -38,8 +38,10 @@ public class PBSEmailParser implements EmailParser {
 
     private static final String STATUS = "status";
     private static final String JOBID = "jobId";
+    private static final String EXIT_STATUS = "exitStatus";
     private static final String REGEX = "[a-zA-Z: ]*(?<" + JOBID + ">[a-zA-Z0-9-\\.]*)\\s+.*\\s+.*\\s+(?<"
             + STATUS + ">[a-zA-Z\\ ]*)";
+    private static final String REGEX_EXIT_STATUS = "Exit_status=(?<" + EXIT_STATUS + ">[\\d]+)";
 
     @Override
     public JobStatusResult parseEmail(Message message) throws MessagingException, AiravataException {
@@ -51,7 +53,7 @@ public class PBSEmailParser implements EmailParser {
             if (matcher.find()) {
                 jobStatusResult.setJobId(matcher.group(JOBID));
                 String statusLine = matcher.group(STATUS);
-                jobStatusResult.setState(getJobState(statusLine));
+                jobStatusResult.setState(getJobState(statusLine, content));
                 return jobStatusResult;
             } else {
                 log.error("No matched found for content => \n" + content);
@@ -63,17 +65,37 @@ public class PBSEmailParser implements EmailParser {
         return null;
     }
 
-    private JobState getJobState(String statusLine) {
+    private JobState getJobState(String statusLine, String content) {
         switch (statusLine) {
             case "Begun execution":
                 return JobState.QUEUED;
             case "Execution terminated":
-                return JobState.COMPLETE;
+                int exitStatus = getExitStatus(content);
+                switch (exitStatus) {
+                    case 0:
+                        return JobState.COMPLETE;
+                    case 1:
+                        return JobState.FAILED;
+                    default:
+                        return JobState.UNKNOWN;
+                }
             default:
                 return JobState.UNKNOWN;
         }
     }
 
+    private int getExitStatus(String content) {
+        Pattern pattern = Pattern.compile(REGEX_EXIT_STATUS);
+        Matcher matcher = pattern.matcher(content);
+        if (matcher.find()) {
+            String group = matcher.group(EXIT_STATUS);
+            if (group != null && !group.trim().isEmpty()) {
+                return Integer.valueOf(group.trim());
+            }
+        }
+        return -1;
+    }
+
 
 
 /*    -----------------------

http://git-wip-us.apache.org/repos/asf/airavata/blob/cd6f1d98/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/parser/SLURMEmailParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/parser/SLURMEmailParser.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/parser/SLURMEmailParser.java
index 8c6dbed..e5af721 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/parser/SLURMEmailParser.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/mail/parser/SLURMEmailParser.java
@@ -61,7 +61,7 @@ public class SLURMEmailParser implements EmailParser {
     }
 
     private JobState getJobState(String state) {
-        switch (state) {
+        switch (state.trim()) {
             case "Began":
                 return JobState.QUEUED;
             case "Ended":

http://git-wip-us.apache.org/repos/asf/airavata/blob/cd6f1d98/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index f628b4e..0012ce1 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -385,7 +385,8 @@ public class SSHProvider extends AbstractProvider {
                     ", execution is configured as asynchronous, so Outhandler will not be invoked");
         }*/
         try {
-            EmailBasedMonitor.getInstant(((MonitorPublisher) jobExecutionContext.getProperty("MonitorPubliser")));
+            EmailBasedMonitor emailBasedMonitor = EmailBasedMonitor.getInstant(((MonitorPublisher) jobExecutionContext.getProperty("MonitorPublisher")));
+            emailBasedMonitor.addToJobMonitorMap(jobExecutionContext);
         } catch (ApplicationSettingsException e) {
             throw new GFacHandlerException("Error while delegating job execution context to email based monitor");
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/cd6f1d98/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
index 588183e..9628585 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
@@ -290,25 +290,32 @@ public class GFACSSHUtils {
             }
         }
         try {
-			if(ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_ENABLE).equalsIgnoreCase("true")){
-				jobDescriptor.setMailOptions(ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_FLAGS));
-				String emailids = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_EMAILIDS);
-			
-				if(taskData.isEnableEmailNotification()){
-					List<String> emailList = jobExecutionContext.getTaskData().getEmailAddresses();
-					String elist = GFacUtils.listToCsv(emailList, ',');
-					if(emailids != null && !emailids.isEmpty()){
-						emailids = emailids +"," + elist;
-					}else{
-						emailids = elist;
-					}
-				}
-				if(emailids != null && !emailids.isEmpty()){
-					logger.info("Email list: "+ emailids);
-					jobDescriptor.setMailAddress(emailids);
-				}
-			}
-		} catch (ApplicationSettingsException e) {
+			if(ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_ENABLE).equalsIgnoreCase("true")) {
+                String flags = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_FLAGS);
+                if (flags != null && jobExecutionContext.getApplicationContext().getComputeResourceDescription().getHostName().equals("stampede.tacc.xsede.org")) {
+                    flags = "ALL";
+                }
+                jobDescriptor.setMailOptions(flags);
+
+                String emailids = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_EMAILIDS);
+
+                if (taskData.isEnableEmailNotification()) {
+                    List<String> emailList = jobExecutionContext.getTaskData().getEmailAddresses();
+                    String elist = GFacUtils.listToCsv(emailList, ',');
+                    if (elist != null && !elist.isEmpty()) {
+                        if (emailids != null && !emailids.isEmpty()) {
+                            emailids = emailids + "," + elist;
+                        } else {
+                            emailids = elist;
+                        }
+                    }
+                }
+                if (emailids != null && !emailids.isEmpty()) {
+                    logger.info("Email list: " + emailids);
+                    jobDescriptor.setMailAddress(emailids);
+                }
+            }
+        } catch (ApplicationSettingsException e) {
 			 logger.error("ApplicationSettingsException : " +e.getLocalizedMessage());
 		}
         // this is common for any application descriptor