You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/16 22:06:13 UTC
[1/2] airavata git commit: Updated email based monitoring to work
with ProcessContext
Repository: airavata
Updated Branches:
refs/heads/master a2b6bdfd9 -> d05c0a166
Updated email based monitoring to work with ProcessContext
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/754bc5c4
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/754bc5c4
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/754bc5c4
Branch: refs/heads/master
Commit: 754bc5c4e90dded69fe14233e5bad5dac5e7dbca
Parents: d9b2df0
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Tue Jun 16 16:05:47 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Tue Jun 16 16:05:47 2015 -0400
----------------------------------------------------------------------
.../gfac/core/monitor/JobStatusResult.java | 3 +-
.../gfac/monitor/email/EmailBasedMonitor.java | 103 +++++++------------
.../monitor/email/parser/LSFEmailParser.java | 2 +-
.../monitor/email/parser/PBSEmailParser.java | 2 +-
.../monitor/email/parser/SLURMEmailParser.java | 2 +-
.../monitor/email/parser/UGEEmailParser.java | 2 +-
6 files changed, 41 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/754bc5c4/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobStatusResult.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobStatusResult.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobStatusResult.java
index 9c4fcc3..dfc77ac 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobStatusResult.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobStatusResult.java
@@ -20,7 +20,8 @@
*/
package org.apache.airavata.gfac.core.monitor;
-import org.apache.airavata.model.experiment.JobState;
+
+import org.apache.airavata.model.status.JobState;
public class JobStatusResult {
private JobState state;
http://git-wip-us.apache.org/repos/asf/airavata/blob/754bc5c4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index 5ef0e88..08f8423 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -21,29 +21,20 @@
package org.apache.airavata.gfac.monitor.email;
import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.logger.AiravataLogger;
-import org.apache.airavata.common.logger.AiravataLoggerFactory;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.core.GFacException;
-import org.apache.airavata.gfac.core.GFacUtils;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.GFacThreadPoolExecutor;
import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.monitor.EmailParser;
import org.apache.airavata.gfac.core.monitor.JobMonitor;
import org.apache.airavata.gfac.core.monitor.JobStatusResult;
-import org.apache.airavata.gfac.core.monitor.EmailParser;
-import org.apache.airavata.gfac.impl.OutHandlerWorker;
+import org.apache.airavata.gfac.impl.GFacWorker;
import org.apache.airavata.gfac.monitor.email.parser.LSFEmailParser;
import org.apache.airavata.gfac.monitor.email.parser.PBSEmailParser;
import org.apache.airavata.gfac.monitor.email.parser.SLURMEmailParser;
import org.apache.airavata.gfac.monitor.email.parser.UGEEmailParser;
import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
-import org.apache.airavata.model.messaging.event.JobIdentifier;
-import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
-import org.apache.airavata.model.experiment.CorrectiveAction;
-import org.apache.airavata.model.experiment.ErrorCategory;
-import org.apache.airavata.model.experiment.JobState;
-import org.apache.airavata.model.experiment.JobStatus;
+import org.apache.airavata.model.status.JobState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,7 +67,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
private Store store;
private Folder emailFolder;
private Properties properties;
- private Map<String, JobExecutionContext> jobMonitorMap = new ConcurrentHashMap<String, JobExecutionContext>();
+ private Map<String, ProcessContext> jobMonitorMap = new ConcurrentHashMap<>();
private String host, emailAddress, password, storeProtocol, folderName ;
private Date monitorStartDate;
private Map<ResourceJobManagerType, EmailParser> emailParserMap = new HashMap<ResourceJobManagerType, EmailParser>();
@@ -99,18 +90,16 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
properties.put("mail.store.protocol", storeProtocol);
}
- public void addToJobMonitorMap(JobExecutionContext jobExecutionContext) {
- String monitorId = jobExecutionContext.getJobDetails().getJobID();
- if (monitorId == null || monitorId.isEmpty()) {
- monitorId = jobExecutionContext.getJobDetails().getJobName();
- }
- addToJobMonitorMap(monitorId, jobExecutionContext);
- }
+ @Override
+ public void monitor(String jobId, ProcessContext processContext) {
+ log.info("[EJM]: Added monitor Id : " + jobId + " to email based monitor map");
+ jobMonitorMap.put(jobId, processContext);
+ }
- public void addToJobMonitorMap(String monitorId, JobExecutionContext jobExecutionContext) {
- log.info("[EJM]: Added monitor Id : " + monitorId + " to email based monitor map");
- jobMonitorMap.put(monitorId, jobExecutionContext);
- }
+ @Override
+ public void stopMonitor(String jobId) {
+ jobMonitorMap.remove(jobId);
+ }
private JobStatusResult parse(Message message) throws MessagingException, AiravataException {
Address fromAddress = message.getFrom()[0];
@@ -132,7 +121,8 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
emailParser = new UGEEmailParser();
break;
default:
- throw new AiravataException("[EJM]: Un-handle resource job manager type: " + jobMonitorType.toString() + " for email monitoring --> " + addressStr);
+ throw new AiravataException("[EJM]: Un-handle resource job manager type: " + jobMonitorType
+ .toString() + " for email monitoring --> " + addressStr);
}
emailParserMap.put(jobMonitorType, emailParser);
@@ -218,12 +208,12 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
for (Message message : searchMessages) {
try {
JobStatusResult jobStatusResult = parse(message);
- JobExecutionContext jEC = jobMonitorMap.get(jobStatusResult.getJobId());
- if (jEC == null) {
- jEC = jobMonitorMap.get(jobStatusResult.getJobName());
+ ProcessContext processContext = jobMonitorMap.get(jobStatusResult.getJobId());
+ if (processContext == null) {
+ processContext = jobMonitorMap.get(jobStatusResult.getJobName());
}
- if (jEC != null) {
- process(jobStatusResult, jEC);
+ if (processContext != null) {
+ process(jobStatusResult, processContext);
processedMessages.add(message);
} else {
// we can get JobExecutionContext null in multiple Gfac instances environment,
@@ -272,15 +262,15 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
}
}
- private void process(JobStatusResult jobStatusResult, JobExecutionContext jEC){
+ private void process(JobStatusResult jobStatusResult, ProcessContext processContext){
JobState resultState = jobStatusResult.getState();
- jEC.getJobDetails().setJobStatus(new JobStatus(resultState));
- boolean runOutHandlers = false;
+ // TODO : update job state on process context
+ boolean runOutflowTasks = false;
String jobDetails = "JobName : " + jobStatusResult.getJobName() + ", JobId : " + jobStatusResult.getJobId();
// TODO - Handle all other valid JobStates
if (resultState == JobState.COMPLETE) {
jobMonitorMap.remove(jobStatusResult.getJobId());
- runOutHandlers = true;
+ runOutflowTasks = true;
log.info("[EJM]: Job Complete email received , removed job from job monitoring. " + jobDetails);
}else if (resultState == JobState.QUEUED) {
// nothing special thing to do, update the status change to rabbit mq at the end of this method.
@@ -290,44 +280,29 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
log.info("[EJM]: Job Active email received, " + jobDetails);
}else if (resultState == JobState.FAILED) {
jobMonitorMap.remove(jobStatusResult.getJobId());
- runOutHandlers = true;
+ runOutflowTasks = true;
log.info("[EJM]: Job failed email received , removed job from job monitoring. " + jobDetails);
- try {
- GFacUtils.saveErrorDetails(jEC, "Job runs on remote compute resource failed", CorrectiveAction.RETRY_SUBMISSION, ErrorCategory.APPLICATION_FAILURE);
- } catch (GFacException e) {
- log.info("[EJM]: Error while saving error details for jobId:{}, expId: {}", jEC.getJobDetails().getJobID(), jEC.getExperimentID());
- }
}else if (resultState == JobState.CANCELED) {
jobMonitorMap.remove(jobStatusResult.getJobId());
- runOutHandlers = false; // Do we need to run out handlers in canceled case?
+ runOutflowTasks = false; // Do we need to run out handlers in canceled case?
log.info("[EJM]: Job canceled mail received, removed job from job monitoring. " + jobDetails);
}
log.info("[EJM]: Publishing status changes to amqp. " + jobDetails);
- publishJobStatusChange(jEC);
+ publishJobStatusChange(processContext);
- if (runOutHandlers) {
+ if (runOutflowTasks) {
log.info("[EJM]: Calling Out Handler chain of " + jobDetails);
- GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(jEC));
+ try {
+ GFacThreadPoolExecutor.getCachedThreadPool().execute(new GFacWorker(processContext));
+ } catch (GFacException e) {
+ log.info("[EJM]: Error while running output tasks", e);
+ }
}
}
- private void publishJobStatusChange(JobExecutionContext jobExecutionContext) {
- JobStatusChangeRequestEvent jobStatus = new JobStatusChangeRequestEvent();
- JobIdentifier jobIdentity = new JobIdentifier(jobExecutionContext.getJobDetails().getJobID(),
- jobExecutionContext.getTaskData().getTaskID(),
- jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
- jobExecutionContext.getExperimentID(),
- jobExecutionContext.getGatewayID());
- jobStatus.setJobIdentity(jobIdentity);
- jobStatus.setState(jobExecutionContext.getJobDetails().getJobStatus().getJobState());
- // we have this JobStatus class to handle amqp monitoring
- log.debugId(jobStatus.getJobIdentity().getJobId(), "[EJM]: Published job status(" +
- jobExecutionContext.getJobDetails().getJobStatus().getJobState().toString() + ") change request, " +
- "experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(),
- jobStatus.getJobIdentity().getTaskId());
-
- jobExecutionContext.getLocalEventPublisher().publish(jobStatus);
+ private void publishJobStatusChange(ProcessContext processContext) {
+ // TODO : implement this
}
private void writeEnvelopeOnError(Message m) throws MessagingException {
@@ -355,13 +330,5 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
this.monitorStartDate = date;
}
- @Override
- public void monitor(String jobId, ProcessContext processContext) {
- }
-
- @Override
- public void stopMonitor(String jobId) {
-
- }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/754bc5c4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/LSFEmailParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/LSFEmailParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/LSFEmailParser.java
index dc7999c..d6e396e 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/LSFEmailParser.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/LSFEmailParser.java
@@ -23,7 +23,7 @@ package org.apache.airavata.gfac.monitor.email.parser;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.gfac.core.monitor.EmailParser;
import org.apache.airavata.gfac.core.monitor.JobStatusResult;
-import org.apache.airavata.model.experiment.JobState;
+import org.apache.airavata.model.status.JobState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/airavata/blob/754bc5c4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/PBSEmailParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/PBSEmailParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/PBSEmailParser.java
index 6f2446b..3879daf 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/PBSEmailParser.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/PBSEmailParser.java
@@ -23,7 +23,7 @@ package org.apache.airavata.gfac.monitor.email.parser;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.gfac.core.monitor.EmailParser;
import org.apache.airavata.gfac.core.monitor.JobStatusResult;
-import org.apache.airavata.model.experiment.JobState;
+import org.apache.airavata.model.status.JobState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/airavata/blob/754bc5c4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/SLURMEmailParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/SLURMEmailParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/SLURMEmailParser.java
index db7521c..3b0e32a 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/SLURMEmailParser.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/SLURMEmailParser.java
@@ -23,7 +23,7 @@ package org.apache.airavata.gfac.monitor.email.parser;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.gfac.core.monitor.EmailParser;
import org.apache.airavata.gfac.core.monitor.JobStatusResult;
-import org.apache.airavata.model.experiment.JobState;
+import org.apache.airavata.model.status.JobState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/airavata/blob/754bc5c4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/UGEEmailParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/UGEEmailParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/UGEEmailParser.java
index e147d73..266456e 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/UGEEmailParser.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/UGEEmailParser.java
@@ -23,7 +23,7 @@ package org.apache.airavata.gfac.monitor.email.parser;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.gfac.core.monitor.EmailParser;
import org.apache.airavata.gfac.core.monitor.JobStatusResult;
-import org.apache.airavata.model.experiment.JobState;
+import org.apache.airavata.model.status.JobState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
[2/2] airavata git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/airavata
Posted by sh...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/airavata
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/d05c0a16
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/d05c0a16
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/d05c0a16
Branch: refs/heads/master
Commit: d05c0a1669fa704e13d299b9c0f879a3370a8d1e
Parents: 754bc5c a2b6bdf
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Tue Jun 16 16:06:06 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Tue Jun 16 16:06:06 2015 -0400
----------------------------------------------------------------------
.../apache/airavata/gfac/core/GFacUtils.java | 891 ++++++++++++-------
.../gfac/core/context/ProcessContext.java | 66 ++
.../gfac/impl/task/JobSubmissionTaskImpl.java | 79 ++
3 files changed, 713 insertions(+), 323 deletions(-)
----------------------------------------------------------------------