You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/05/12 19:59:33 UTC
airavata git commit: Added monitor method to GfacProvider interface
and implement monitor job method in BetterGfacImpl
Repository: airavata
Updated Branches:
refs/heads/master 113b75fc6 -> 258e06e1c
Added monitor method to GfacProvider interface and implement monitor job method in BetterGfacImpl
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/258e06e1
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/258e06e1
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/258e06e1
Branch: refs/heads/master
Commit: 258e06e1ccb38b41e318efecf26395c3c7206fb9
Parents: 113b75f
Author: shamrath <sh...@gmail.com>
Authored: Tue May 12 13:59:30 2015 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Tue May 12 13:59:30 2015 -0400
----------------------------------------------------------------------
.../gfac/bes/provider/impl/BESProvider.java | 5 +
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 15 ++-
.../gfac/core/provider/GFacProvider.java | 8 ++
.../org/apache/airavata/job/TestProvider.java | 5 +
.../gsissh/provider/impl/GSISSHProvider.java | 123 ++++++++++---------
.../gfac/local/provider/impl/LocalProvider.java | 5 +
.../gfac/ssh/provider/impl/SSHProvider.java | 85 +++++++------
7 files changed, 143 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/258e06e1/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
index d505671..2f6add6 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
@@ -418,6 +418,11 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
// TODO: Auto generated method body.
}
+ @Override
+ public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ // TODO: Auto generated method body.
+ }
+
protected void waitUntilDone(FactoryClient factory, EndpointReferenceType activityEpr, JobDetails jobDetails) throws Exception {
try {
http://git-wip-us.apache.org/repos/asf/airavata/blob/258e06e1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index dd82fa7..55894a3 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -669,7 +669,9 @@ public class BetterGfacImpl implements GFac,Watcher {
break;
case OUTHANDLERSINVOKED:
case COMPLETED:
+ GFacUtils.updateExperimentStatus(jobExecutionContext.getExperimentID(), ExperimentState.COMPLETED);
case FAILED:
+ GFacUtils.updateExperimentStatus(jobExecutionContext.getExperimentID(), ExperimentState.FAILED);
case UNKNOWN:
log.info("All output handlers are invoked successfully, ExperimentId: " + experimentID + " taskId: " + jobExecutionContext.getTaskData().getTaskID());
break;
@@ -705,8 +707,15 @@ public class BetterGfacImpl implements GFac,Watcher {
}
}
- private void monitorJob(JobExecutionContext jobExecutionContext) {
- // TODO - Auto generated message.
+ private void monitorJob(JobExecutionContext jobExecutionContext) throws GFacException, GFacProviderException {
+ GFacProvider provider = jobExecutionContext.getProvider();
+ if (provider != null) {
+ provider.monitor(jobExecutionContext);
+ }
+ if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
+ invokeOutFlowHandlers(jobExecutionContext);
+ }
+
}
private void launch(JobExecutionContext jobExecutionContext) throws GFacException {
@@ -1028,8 +1037,8 @@ public class BetterGfacImpl implements GFac,Watcher {
log.info("Experiment execution is cancelled, so OutHandler invocation is going to stop");
break;
}
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
}
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
} catch (Exception e) {
throw new GFacException("Cannot invoke OutHandlers\n" + e.getMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/258e06e1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java
index 6006309..e031980 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java
@@ -66,4 +66,12 @@ public interface GFacProvider{
* @param jobExecutionContext
*/
public void recover(JobExecutionContext jobExecutionContext)throws GFacProviderException,GFacException;
+
+ /**
+ * FIXME :Added this method to make an quick fix, need to remove this monitor logic from provider and invoke separately.
+ * @param jobExecutionContext
+ * @throws GFacProviderException
+ * @throws GFacException
+ */
+ public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/258e06e1/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java
index 3829dae..5881203 100644
--- a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java
+++ b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java
@@ -38,6 +38,11 @@ public class TestProvider extends AbstractProvider {
// TODO: Auto generated method body.
}
+ @Override
+ public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ // TODO: Auto generated method body.
+ }
+
public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
//To change body of implemented methods use File | Settings | File Templates.
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/258e06e1/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
index c35445b..32b3f93 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -95,9 +95,6 @@ public class GSISSHProvider extends AbstractProvider {
Cluster cluster = null;
try {
- AppCatalog appCatalog = jobExecutionContext.getAppCatalog();
- SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(
- jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId());
if (jobExecutionContext.getSecurityContext(jobExecutionContext.getHostName()) != null) {
cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(jobExecutionContext.getHostName())).getPbsCluster();
}
@@ -126,7 +123,7 @@ public class GSISSHProvider extends AbstractProvider {
// Now job has submitted to the resource, its up to the Provider to parse the information to daemon handler
// to perform monitoring, daemon handlers can be accessed from anywhere
- delegateToMonitorHandlers(jobExecutionContext, sshJobSubmission , jobDetails.getJobID());
+ monitor(jobExecutionContext);
// we know this host is type GsiSSHHostType
} catch (Exception e) {
String error = "Error submitting the job to host " + computeResourceDescription.getHostName() + " message: " + e.getMessage();
@@ -145,58 +142,6 @@ public class GSISSHProvider extends AbstractProvider {
}
- public void delegateToMonitorHandlers(JobExecutionContext jobExecutionContext, SSHJobSubmission sshJobSubmission, String jobID) throws GFacHandlerException, AppCatalogException {
- if (jobExecutionContext.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
- MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
- if (monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) {
- try {
- EmailBasedMonitor emailBasedMonitor = EmailMonitorFactory.getEmailBasedMonitor(
- sshJobSubmission.getResourceJobManager().getResourceJobManagerType());
- emailBasedMonitor.addToJobMonitorMap(jobExecutionContext);
- } catch (AiravataException e) {
- throw new GFacHandlerException("Error while activating email job monitoring ", e);
- }
- return;
- }
- }
-
- // if email monitor is not activeated or not configure we use pull or push monitor
- List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
- if (daemonHandlers == null) {
- daemonHandlers = BetterGfacImpl.getDaemonHandlers();
- }
- ThreadedHandler pullMonitorHandler = null;
- ThreadedHandler pushMonitorHandler = null;
- MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
- for (ThreadedHandler threadedHandler : daemonHandlers) {
- if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
- pullMonitorHandler = threadedHandler;
- if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) {
- log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned: " + jobID);
- pullMonitorHandler.invoke(jobExecutionContext);
- } else {
- log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" +
- " to handle by the GridPullMonitorHandler");
- }
- } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
- pushMonitorHandler = threadedHandler;
- if (monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) {
- log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned: " + jobID);
- pushMonitorHandler.invoke(jobExecutionContext);
- } else {
- log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
- " to handle by the GridPushMonitorHandler");
- }
- }
- // have to handle the GridPushMonitorHandler logic
- }
- if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
- log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
- ", execution is configured as asynchronous, so Outhandler will not be invoked");
-
- }
- }
-
public void removeFromMonitorHandlers(JobExecutionContext jobExecutionContext, SSHJobSubmission sshJobSubmission, String jobID) throws GFacHandlerException {
List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
if (daemonHandlers == null) {
@@ -339,13 +284,71 @@ public class GSISSHProvider extends AbstractProvider {
throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
}
}
- AppCatalog appCatalog = jobExecutionContext.getAppCatalog();
- SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(
- jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId());
- delegateToMonitorHandlers(jobExecutionContext, sshJobSubmission, jobId);
+ monitor(jobExecutionContext);
} catch (Exception e) {
log.error("Error while recover the job", e);
throw new GFacProviderException("Error delegating already ran job to Monitoring", e);
}
}
+
+ @Override
+ public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ String jobSubmissionInterfaceId = jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId();
+ SSHJobSubmission sshJobSubmission = null;
+ try {
+ sshJobSubmission = jobExecutionContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId);
+ } catch (AppCatalogException e) {
+ throw new GFacException("Error while reading compute resource", e);
+ }
+ if (jobExecutionContext.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
+ MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
+ if (monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) {
+ try {
+ EmailBasedMonitor emailBasedMonitor = EmailMonitorFactory.getEmailBasedMonitor(
+ sshJobSubmission.getResourceJobManager().getResourceJobManagerType());
+ emailBasedMonitor.addToJobMonitorMap(jobExecutionContext);
+ } catch (AiravataException e) {
+ throw new GFacHandlerException("Error while activating email job monitoring ", e);
+ }
+ return;
+ }
+ }
+
+ // if email monitor is not activeated or not configure we use pull or push monitor
+ List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+ if (daemonHandlers == null) {
+ daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+ }
+ ThreadedHandler pullMonitorHandler = null;
+ ThreadedHandler pushMonitorHandler = null;
+ MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
+ String jobID = jobExecutionContext.getJobDetails().getJobID();
+ for (ThreadedHandler threadedHandler : daemonHandlers) {
+ if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
+ pullMonitorHandler = threadedHandler;
+ if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) {
+ log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned: " + jobID);
+ pullMonitorHandler.invoke(jobExecutionContext);
+ } else {
+ log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" +
+ " to handle by the GridPullMonitorHandler");
+ }
+ } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
+ pushMonitorHandler = threadedHandler;
+ if (monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) {
+ log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned: " + jobID);
+ pushMonitorHandler.invoke(jobExecutionContext);
+ } else {
+ log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
+ " to handle by the GridPushMonitorHandler");
+ }
+ }
+ // have to handle the GridPushMonitorHandler logic
+ }
+ if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
+ log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
+ ", execution is configured as asynchronous, so Outhandler will not be invoked");
+
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/258e06e1/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
index df932b4..0bdf190 100644
--- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
@@ -257,6 +257,11 @@ public class LocalProvider extends AbstractProvider {
// TODO: Auto generated method body.
}
+ @Override
+ public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ // TODO: Auto generated method body.
+ }
+
private void buildCommand() {
cmdList.add(jobExecutionContext.getExecutablePath());
http://git-wip-us.apache.org/repos/asf/airavata/blob/258e06e1/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index 676e045..485029f 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -186,7 +186,7 @@ public class SSHProvider extends AbstractProvider {
}
data.append("jobDesc=").append(jobDescriptor.toXML());
data.append(",jobId=").append(jobDetails.getJobID());
- delegateToMonitorHandlers(jobExecutionContext);
+ monitor(jobExecutionContext);
} catch (SSHApiException e) {
String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
log.error(error);
@@ -403,44 +403,6 @@ public class SSHProvider extends AbstractProvider {
return stdOutputString;
}
- public void delegateToMonitorHandlers(JobExecutionContext jobExecutionContext) throws GFacHandlerException, AppCatalogException {
- if (jobExecutionContext.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
- String jobSubmissionInterfaceId = jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId();
- SSHJobSubmission sshJobSubmission = jobExecutionContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId);
- MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
- if (monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) {
- try {
- EmailBasedMonitor emailBasedMonitor = EmailMonitorFactory.getEmailBasedMonitor(
- sshJobSubmission.getResourceJobManager().getResourceJobManagerType());
- emailBasedMonitor.addToJobMonitorMap(jobExecutionContext);
- } catch (AiravataException e) {
- throw new GFacHandlerException("Error while activating email job monitoring ", e);
- }
- return;
- }
- }
-
- // if email monitor is not activeated or not configure we use pull or push monitor
- List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
- if (daemonHandlers == null) {
- daemonHandlers = BetterGfacImpl.getDaemonHandlers();
- }
- ThreadedHandler pullMonitorHandler = null;
- ThreadedHandler pushMonitorHandler = null;
- for (ThreadedHandler threadedHandler : daemonHandlers) {
- if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
- pullMonitorHandler = threadedHandler;
- pullMonitorHandler.invoke(jobExecutionContext);
- }
- // have to handle the GridPushMonitorHandler logic
- }
- if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
- log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
- ", execution is configured as asynchronous, so Outhandler will not be invoked");
- }
- }
-
-
public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
// have to implement the logic to recover a gfac failure
initialize(jobExecutionContext);
@@ -498,7 +460,7 @@ public class SSHProvider extends AbstractProvider {
throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
}
}
- delegateToMonitorHandlers(jobExecutionContext);
+ monitor(jobExecutionContext);
} catch (Exception e) {
log.error("Error while recover the job", e);
throw new GFacProviderException("Error delegating already ran job to Monitoring", e);
@@ -508,4 +470,47 @@ public class SSHProvider extends AbstractProvider {
this.execute(jobExecutionContext);
}
}
+
+ @Override
+ public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ if (jobExecutionContext.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
+ String jobSubmissionInterfaceId = jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId();
+ SSHJobSubmission sshJobSubmission = null;
+ try {
+ sshJobSubmission = jobExecutionContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId);
+ } catch (AppCatalogException e) {
+ throw new GFacException("Error while reading compute resource", e);
+ }
+ MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
+ if (monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) {
+ try {
+ EmailBasedMonitor emailBasedMonitor = EmailMonitorFactory.getEmailBasedMonitor(
+ sshJobSubmission.getResourceJobManager().getResourceJobManagerType());
+ emailBasedMonitor.addToJobMonitorMap(jobExecutionContext);
+ } catch (AiravataException e) {
+ throw new GFacHandlerException("Error while activating email job monitoring ", e);
+ }
+ return;
+ }
+ }
+
+ // if email monitor is not activeated or not configure we use pull or push monitor
+ List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+ if (daemonHandlers == null) {
+ daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+ }
+ ThreadedHandler pullMonitorHandler = null;
+ ThreadedHandler pushMonitorHandler = null;
+ for (ThreadedHandler threadedHandler : daemonHandlers) {
+ if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
+ pullMonitorHandler = threadedHandler;
+ pullMonitorHandler.invoke(jobExecutionContext);
+ }
+ // have to handle the GridPushMonitorHandler logic
+ }
+ if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
+ log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
+ ", execution is configured as asynchronous, so Outhandler will not be invoked");
+ }
+ }
}