You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/05/12 19:59:33 UTC

airavata git commit: Added monitor method to GfacProvider interface and implement monitor job method in BetterGfacImpl

Repository: airavata
Updated Branches:
  refs/heads/master 113b75fc6 -> 258e06e1c


Added monitor method to GfacProvider interface and implement monitor job method in BetterGfacImpl


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/258e06e1
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/258e06e1
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/258e06e1

Branch: refs/heads/master
Commit: 258e06e1ccb38b41e318efecf26395c3c7206fb9
Parents: 113b75f
Author: shamrath <sh...@gmail.com>
Authored: Tue May 12 13:59:30 2015 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Tue May 12 13:59:30 2015 -0400

----------------------------------------------------------------------
 .../gfac/bes/provider/impl/BESProvider.java     |   5 +
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  |  15 ++-
 .../gfac/core/provider/GFacProvider.java        |   8 ++
 .../org/apache/airavata/job/TestProvider.java   |   5 +
 .../gsissh/provider/impl/GSISSHProvider.java    | 123 ++++++++++---------
 .../gfac/local/provider/impl/LocalProvider.java |   5 +
 .../gfac/ssh/provider/impl/SSHProvider.java     |  85 +++++++------
 7 files changed, 143 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/258e06e1/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
index d505671..2f6add6 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
@@ -418,6 +418,11 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
         // TODO: Auto generated method body.
     }
 
+    @Override
+    public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+        // TODO: Auto generated method body.
+    }
+
     protected void waitUntilDone(FactoryClient factory, EndpointReferenceType activityEpr, JobDetails jobDetails) throws Exception {
 		
 		try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/258e06e1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index dd82fa7..55894a3 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -669,7 +669,9 @@ public class BetterGfacImpl implements GFac,Watcher {
                     break;
                 case OUTHANDLERSINVOKED:
                 case COMPLETED:
+                    GFacUtils.updateExperimentStatus(jobExecutionContext.getExperimentID(), ExperimentState.COMPLETED);
                 case FAILED:
+                    GFacUtils.updateExperimentStatus(jobExecutionContext.getExperimentID(), ExperimentState.FAILED);
                 case UNKNOWN:
                     log.info("All output handlers are invoked successfully, ExperimentId: " + experimentID + " taskId: " + jobExecutionContext.getTaskData().getTaskID());
                     break;
@@ -705,8 +707,15 @@ public class BetterGfacImpl implements GFac,Watcher {
         }
     }
 
-    private void monitorJob(JobExecutionContext jobExecutionContext) {
-        // TODO - Auto generated message.
+    private void monitorJob(JobExecutionContext jobExecutionContext) throws GFacException, GFacProviderException {
+        GFacProvider provider = jobExecutionContext.getProvider();
+        if (provider != null) {
+            provider.monitor(jobExecutionContext);
+        }
+        if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
+            invokeOutFlowHandlers(jobExecutionContext);
+        }
+
     }
 
     private void launch(JobExecutionContext jobExecutionContext) throws GFacException {
@@ -1028,8 +1037,8 @@ public class BetterGfacImpl implements GFac,Watcher {
                         log.info("Experiment execution is cancelled, so OutHandler invocation is going to stop");
                         break;
                     }
-                    monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
                 }
+                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
             } catch (Exception e) {
                 throw new GFacException("Cannot invoke OutHandlers\n" + e.getMessage(), e);
             }

http://git-wip-us.apache.org/repos/asf/airavata/blob/258e06e1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java
index 6006309..e031980 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java
@@ -66,4 +66,12 @@ public interface GFacProvider{
      * @param jobExecutionContext
      */
     public void recover(JobExecutionContext jobExecutionContext)throws GFacProviderException,GFacException;
+
+    /**
+     * FIXME :Added this method to make an quick fix, need to remove this monitor logic from provider and invoke separately.
+     * @param jobExecutionContext
+     * @throws GFacProviderException
+     * @throws GFacException
+     */
+    public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException;
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/258e06e1/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java
index 3829dae..5881203 100644
--- a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java
+++ b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java
@@ -38,6 +38,11 @@ public class TestProvider extends AbstractProvider {
         // TODO: Auto generated method body.
     }
 
+    @Override
+    public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+        // TODO: Auto generated method body.
+    }
+
     public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
         //To change body of implemented methods use File | Settings | File Templates.
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/258e06e1/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
index c35445b..32b3f93 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -95,9 +95,6 @@ public class GSISSHProvider extends AbstractProvider {
         Cluster cluster = null;
 
         try {
-            AppCatalog appCatalog = jobExecutionContext.getAppCatalog();
-            SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(
-                    jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId());
             if (jobExecutionContext.getSecurityContext(jobExecutionContext.getHostName()) != null) {
                 cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(jobExecutionContext.getHostName())).getPbsCluster();
             }
@@ -126,7 +123,7 @@ public class GSISSHProvider extends AbstractProvider {
 
             // Now job has submitted to the resource, its up to the Provider to parse the information to daemon handler
             // to perform monitoring, daemon handlers can be accessed from anywhere
-            delegateToMonitorHandlers(jobExecutionContext, sshJobSubmission , jobDetails.getJobID());
+            monitor(jobExecutionContext);
             // we know this host is type GsiSSHHostType
         } catch (Exception e) {
 		    String error = "Error submitting the job to host " + computeResourceDescription.getHostName() + " message: " + e.getMessage();
@@ -145,58 +142,6 @@ public class GSISSHProvider extends AbstractProvider {
           
     }
 
-    public void delegateToMonitorHandlers(JobExecutionContext jobExecutionContext, SSHJobSubmission sshJobSubmission, String jobID) throws GFacHandlerException, AppCatalogException {
-        if (jobExecutionContext.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
-            MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
-            if (monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) {
-                try {
-                    EmailBasedMonitor emailBasedMonitor = EmailMonitorFactory.getEmailBasedMonitor(
-                            sshJobSubmission.getResourceJobManager().getResourceJobManagerType());
-                    emailBasedMonitor.addToJobMonitorMap(jobExecutionContext);
-                } catch (AiravataException e) {
-                    throw new GFacHandlerException("Error while activating email job monitoring ", e);
-                }
-                return;
-            }
-        }
-
-        // if email monitor is not activeated or not configure we use pull or push monitor
-        List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
-        if (daemonHandlers == null) {
-            daemonHandlers = BetterGfacImpl.getDaemonHandlers();
-        }
-        ThreadedHandler pullMonitorHandler = null;
-        ThreadedHandler pushMonitorHandler = null;
-        MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
-        for (ThreadedHandler threadedHandler : daemonHandlers) {
-            if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
-                pullMonitorHandler = threadedHandler;
-                if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) {
-                    log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned:  " + jobID);
-                    pullMonitorHandler.invoke(jobExecutionContext);
-                } else {
-                    log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" +
-                            " to handle by the GridPullMonitorHandler");
-                }
-            } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
-                pushMonitorHandler = threadedHandler;
-                if (monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) {
-                    log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned:  " + jobID);
-                    pushMonitorHandler.invoke(jobExecutionContext);
-                } else {
-                    log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
-                            " to handle by the GridPushMonitorHandler");
-                }
-            }
-            // have to handle the GridPushMonitorHandler logic
-        }
-        if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
-            log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
-                    ", execution is configured as asynchronous, so Outhandler will not be invoked");
-
-        }
-    }
-
     public void removeFromMonitorHandlers(JobExecutionContext jobExecutionContext, SSHJobSubmission sshJobSubmission, String jobID) throws GFacHandlerException {
         List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
         if (daemonHandlers == null) {
@@ -339,13 +284,71 @@ public class GSISSHProvider extends AbstractProvider {
                     throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
                 }
             }
-            AppCatalog appCatalog = jobExecutionContext.getAppCatalog();
-            SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(
-                    jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId());
-            delegateToMonitorHandlers(jobExecutionContext, sshJobSubmission, jobId);
+            monitor(jobExecutionContext);
         } catch (Exception e) {
             log.error("Error while recover the job", e);
             throw new GFacProviderException("Error delegating already ran job to Monitoring", e);
         }
     }
+
+    @Override
+    public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+        String jobSubmissionInterfaceId = jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId();
+        SSHJobSubmission sshJobSubmission = null;
+        try {
+            sshJobSubmission = jobExecutionContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId);
+        } catch (AppCatalogException e) {
+            throw new GFacException("Error while reading compute resource", e);
+        }
+        if (jobExecutionContext.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
+            MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
+            if (monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) {
+                try {
+                    EmailBasedMonitor emailBasedMonitor = EmailMonitorFactory.getEmailBasedMonitor(
+                            sshJobSubmission.getResourceJobManager().getResourceJobManagerType());
+                    emailBasedMonitor.addToJobMonitorMap(jobExecutionContext);
+                } catch (AiravataException e) {
+                    throw new GFacHandlerException("Error while activating email job monitoring ", e);
+                }
+                return;
+            }
+        }
+
+        // if email monitor is not activeated or not configure we use pull or push monitor
+        List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+        if (daemonHandlers == null) {
+            daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+        }
+        ThreadedHandler pullMonitorHandler = null;
+        ThreadedHandler pushMonitorHandler = null;
+        MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
+        String jobID = jobExecutionContext.getJobDetails().getJobID();
+        for (ThreadedHandler threadedHandler : daemonHandlers) {
+            if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
+                pullMonitorHandler = threadedHandler;
+                if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) {
+                    log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned:  " + jobID);
+                    pullMonitorHandler.invoke(jobExecutionContext);
+                } else {
+                    log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" +
+                            " to handle by the GridPullMonitorHandler");
+                }
+            } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
+                pushMonitorHandler = threadedHandler;
+                if (monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) {
+                    log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned:  " + jobID);
+                    pushMonitorHandler.invoke(jobExecutionContext);
+                } else {
+                    log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
+                            " to handle by the GridPushMonitorHandler");
+                }
+            }
+            // have to handle the GridPushMonitorHandler logic
+        }
+        if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
+            log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
+                    ", execution is configured as asynchronous, so Outhandler will not be invoked");
+
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/258e06e1/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
index df932b4..0bdf190 100644
--- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
@@ -257,6 +257,11 @@ public class LocalProvider extends AbstractProvider {
         // TODO: Auto generated method body.
     }
 
+    @Override
+    public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+        // TODO: Auto generated method body.
+    }
+
 
     private void buildCommand() {
         cmdList.add(jobExecutionContext.getExecutablePath());

http://git-wip-us.apache.org/repos/asf/airavata/blob/258e06e1/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index 676e045..485029f 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -186,7 +186,7 @@ public class SSHProvider extends AbstractProvider {
                     }
                     data.append("jobDesc=").append(jobDescriptor.toXML());
                     data.append(",jobId=").append(jobDetails.getJobID());
-                    delegateToMonitorHandlers(jobExecutionContext);
+                    monitor(jobExecutionContext);
                 } catch (SSHApiException e) {
                     String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
                     log.error(error);
@@ -403,44 +403,6 @@ public class SSHProvider extends AbstractProvider {
         return stdOutputString;
     }
 
-    public void delegateToMonitorHandlers(JobExecutionContext jobExecutionContext) throws GFacHandlerException, AppCatalogException {
-        if (jobExecutionContext.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
-            String jobSubmissionInterfaceId = jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId();
-            SSHJobSubmission sshJobSubmission = jobExecutionContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId);
-            MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
-            if (monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) {
-                try {
-                    EmailBasedMonitor emailBasedMonitor = EmailMonitorFactory.getEmailBasedMonitor(
-                            sshJobSubmission.getResourceJobManager().getResourceJobManagerType());
-                    emailBasedMonitor.addToJobMonitorMap(jobExecutionContext);
-                } catch (AiravataException e) {
-                    throw new GFacHandlerException("Error while activating email job monitoring ", e);
-                }
-                return;
-            }
-        }
-
-        // if email monitor is not activeated or not configure we use pull or push monitor
-        List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
-        if (daemonHandlers == null) {
-            daemonHandlers = BetterGfacImpl.getDaemonHandlers();
-        }
-        ThreadedHandler pullMonitorHandler = null;
-        ThreadedHandler pushMonitorHandler = null;
-        for (ThreadedHandler threadedHandler : daemonHandlers) {
-            if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
-                pullMonitorHandler = threadedHandler;
-                pullMonitorHandler.invoke(jobExecutionContext);
-            }
-            // have to handle the GridPushMonitorHandler logic
-        }
-        if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
-            log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
-                    ", execution is configured as asynchronous, so Outhandler will not be invoked");
-        }
-    }
-
-
     public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
         // have to implement the logic to recover a gfac failure
         initialize(jobExecutionContext);
@@ -498,7 +460,7 @@ public class SSHProvider extends AbstractProvider {
                         throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
                     }
                 }
-                delegateToMonitorHandlers(jobExecutionContext);
+                monitor(jobExecutionContext);
             } catch (Exception e) {
                 log.error("Error while recover the job", e);
                 throw new GFacProviderException("Error delegating already ran job to Monitoring", e);
@@ -508,4 +470,47 @@ public class SSHProvider extends AbstractProvider {
             this.execute(jobExecutionContext);
         }
     }
+
+    @Override
+    public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+        if (jobExecutionContext.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
+            String jobSubmissionInterfaceId = jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId();
+            SSHJobSubmission sshJobSubmission = null;
+            try {
+                sshJobSubmission = jobExecutionContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId);
+            } catch (AppCatalogException e) {
+                throw new GFacException("Error while reading compute resource", e);
+            }
+            MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
+            if (monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) {
+                try {
+                    EmailBasedMonitor emailBasedMonitor = EmailMonitorFactory.getEmailBasedMonitor(
+                            sshJobSubmission.getResourceJobManager().getResourceJobManagerType());
+                    emailBasedMonitor.addToJobMonitorMap(jobExecutionContext);
+                } catch (AiravataException e) {
+                    throw new GFacHandlerException("Error while activating email job monitoring ", e);
+                }
+                return;
+            }
+        }
+
+        // if email monitor is not activeated or not configure we use pull or push monitor
+        List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+        if (daemonHandlers == null) {
+            daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+        }
+        ThreadedHandler pullMonitorHandler = null;
+        ThreadedHandler pushMonitorHandler = null;
+        for (ThreadedHandler threadedHandler : daemonHandlers) {
+            if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
+                pullMonitorHandler = threadedHandler;
+                pullMonitorHandler.invoke(jobExecutionContext);
+            }
+            // have to handle the GridPushMonitorHandler logic
+        }
+        if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
+            log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
+                    ", execution is configured as asynchronous, so Outhandler will not be invoked");
+        }
+    }
 }