You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/04/10 17:50:29 UTC

[5/5] airavata git commit: AIRAVATA-1666 - Added compute resource level email based monitoring check.

AIRAVATA-1666 - Added compute resource level email based monitoring check.


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/93d4421b
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/93d4421b
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/93d4421b

Branch: refs/heads/emailMonitoring
Commit: 93d4421b7e5c0d32fc62ceae3d0d0c88cdaf7838
Parents: 042e25c
Author: shamrath <sh...@gmail.com>
Authored: Fri Apr 10 11:50:20 2015 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Fri Apr 10 11:50:20 2015 -0400

----------------------------------------------------------------------
 .../lib/airavata/computeResourceModel_types.cpp |  12 +-
 .../lib/airavata/computeResourceModel_types.h   |  16 +--
 .../Model/AppCatalog/ComputeResource/Types.php  |  20 ++--
 .../model/appcatalog/computeresource/ttypes.py  |  18 +--
 .../client/samples/CreateLaunchExperiment.java  |  21 ++--
 .../computeresource/SSHJobSubmission.java       | 112 +++++++++----------
 .../computeResourceModel.thrift                 |   2 +-
 .../gsissh/provider/impl/GSISSHProvider.java    |  87 +++++++-------
 .../gfac/monitor/email/EmailBasedMonitor.java   |  61 ++++++----
 .../gfac/monitor/email/EmailMonitorFactory.java |  58 ++++++++++
 .../gfac/ssh/provider/impl/SSHProvider.java     |  58 ++++++----
 11 files changed, 279 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.cpp
index 8b83573..2a57b7d 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.cpp
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.cpp
@@ -1142,8 +1142,8 @@ uint32_t SSHJobSubmission::read(::apache::thrift::protocol::TProtocol* iprot) {
         break;
       case 7:
         if (ftype == ::apache::thrift::protocol::T_STRUCT) {
-          xfer += this->emailMonitor.read(iprot);
-          this->__isset.emailMonitor = true;
+          xfer += this->emailMonitorProperty.read(iprot);
+          this->__isset.emailMonitorProperty = true;
         } else {
           xfer += iprot->skip(ftype);
         }
@@ -1197,9 +1197,9 @@ uint32_t SSHJobSubmission::write(::apache::thrift::protocol::TProtocol* oprot) c
     xfer += oprot->writeI32((int32_t)this->monitorMode);
     xfer += oprot->writeFieldEnd();
   }
-  if (this->__isset.emailMonitor) {
-    xfer += oprot->writeFieldBegin("emailMonitor", ::apache::thrift::protocol::T_STRUCT, 7);
-    xfer += this->emailMonitor.write(oprot);
+  if (this->__isset.emailMonitorProperty) {
+    xfer += oprot->writeFieldBegin("emailMonitorProperty", ::apache::thrift::protocol::T_STRUCT, 7);
+    xfer += this->emailMonitorProperty.write(oprot);
     xfer += oprot->writeFieldEnd();
   }
   xfer += oprot->writeFieldStop();
@@ -1215,7 +1215,7 @@ void swap(SSHJobSubmission &a, SSHJobSubmission &b) {
   swap(a.alternativeSSHHostName, b.alternativeSSHHostName);
   swap(a.sshPort, b.sshPort);
   swap(a.monitorMode, b.monitorMode);
-  swap(a.emailMonitor, b.emailMonitor);
+  swap(a.emailMonitorProperty, b.emailMonitorProperty);
   swap(a.__isset, b.__isset);
 }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.h
index 05a2dfd..2b72a04 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.h
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.h
@@ -647,11 +647,11 @@ class LOCALDataMovement {
 void swap(LOCALDataMovement &a, LOCALDataMovement &b);
 
 typedef struct _SSHJobSubmission__isset {
-  _SSHJobSubmission__isset() : alternativeSSHHostName(false), sshPort(true), monitorMode(false), emailMonitor(false) {}
+  _SSHJobSubmission__isset() : alternativeSSHHostName(false), sshPort(true), monitorMode(false), emailMonitorProperty(false) {}
   bool alternativeSSHHostName;
   bool sshPort;
   bool monitorMode;
-  bool emailMonitor;
+  bool emailMonitorProperty;
 } _SSHJobSubmission__isset;
 
 class SSHJobSubmission {
@@ -671,7 +671,7 @@ class SSHJobSubmission {
   std::string alternativeSSHHostName;
   int32_t sshPort;
   MonitorMode::type monitorMode;
-  EmailMonitorProperty emailMonitor;
+  EmailMonitorProperty emailMonitorProperty;
 
   _SSHJobSubmission__isset __isset;
 
@@ -702,9 +702,9 @@ class SSHJobSubmission {
     __isset.monitorMode = true;
   }
 
-  void __set_emailMonitor(const EmailMonitorProperty& val) {
-    emailMonitor = val;
-    __isset.emailMonitor = true;
+  void __set_emailMonitorProperty(const EmailMonitorProperty& val) {
+    emailMonitorProperty = val;
+    __isset.emailMonitorProperty = true;
   }
 
   bool operator == (const SSHJobSubmission & rhs) const
@@ -727,9 +727,9 @@ class SSHJobSubmission {
       return false;
     else if (__isset.monitorMode && !(monitorMode == rhs.monitorMode))
       return false;
-    if (__isset.emailMonitor != rhs.__isset.emailMonitor)
+    if (__isset.emailMonitorProperty != rhs.__isset.emailMonitorProperty)
       return false;
-    else if (__isset.emailMonitor && !(emailMonitor == rhs.emailMonitor))
+    else if (__isset.emailMonitorProperty && !(emailMonitorProperty == rhs.emailMonitorProperty))
       return false;
     return true;
   }

http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/AppCatalog/ComputeResource/Types.php
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/AppCatalog/ComputeResource/Types.php b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/AppCatalog/ComputeResource/Types.php
index e488087..22c46a9 100644
--- a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/AppCatalog/ComputeResource/Types.php
+++ b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/AppCatalog/ComputeResource/Types.php
@@ -1232,7 +1232,7 @@ class SSHJobSubmission {
   public $alternativeSSHHostName = null;
   public $sshPort = 22;
   public $monitorMode = null;
-  public $emailMonitor = null;
+  public $emailMonitorProperty = null;
 
   public function __construct($vals=null) {
     if (!isset(self::$_TSPEC)) {
@@ -1263,7 +1263,7 @@ class SSHJobSubmission {
           'type' => TType::I32,
           ),
         7 => array(
-          'var' => 'emailMonitor',
+          'var' => 'emailMonitorProperty',
           'type' => TType::STRUCT,
           'class' => '\Airavata\Model\AppCatalog\ComputeResource\EmailMonitorProperty',
           ),
@@ -1288,8 +1288,8 @@ class SSHJobSubmission {
       if (isset($vals['monitorMode'])) {
         $this->monitorMode = $vals['monitorMode'];
       }
-      if (isset($vals['emailMonitor'])) {
-        $this->emailMonitor = $vals['emailMonitor'];
+      if (isset($vals['emailMonitorProperty'])) {
+        $this->emailMonitorProperty = $vals['emailMonitorProperty'];
       }
     }
   }
@@ -1358,8 +1358,8 @@ class SSHJobSubmission {
           break;
         case 7:
           if ($ftype == TType::STRUCT) {
-            $this->emailMonitor = new \Airavata\Model\AppCatalog\ComputeResource\EmailMonitorProperty();
-            $xfer += $this->emailMonitor->read($input);
+            $this->emailMonitorProperty = new \Airavata\Model\AppCatalog\ComputeResource\EmailMonitorProperty();
+            $xfer += $this->emailMonitorProperty->read($input);
           } else {
             $xfer += $input->skip($ftype);
           }
@@ -1410,12 +1410,12 @@ class SSHJobSubmission {
       $xfer += $output->writeI32($this->monitorMode);
       $xfer += $output->writeFieldEnd();
     }
-    if ($this->emailMonitor !== null) {
-      if (!is_object($this->emailMonitor)) {
+    if ($this->emailMonitorProperty !== null) {
+      if (!is_object($this->emailMonitorProperty)) {
         throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA);
       }
-      $xfer += $output->writeFieldBegin('emailMonitor', TType::STRUCT, 7);
-      $xfer += $this->emailMonitor->write($output);
+      $xfer += $output->writeFieldBegin('emailMonitorProperty', TType::STRUCT, 7);
+      $xfer += $this->emailMonitorProperty->write($output);
       $xfer += $output->writeFieldEnd();
     }
     $xfer += $output->writeFieldStop();

http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/appcatalog/computeresource/ttypes.py
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/appcatalog/computeresource/ttypes.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/appcatalog/computeresource/ttypes.py
index e635472..8b34be1 100644
--- a/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/appcatalog/computeresource/ttypes.py
+++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/appcatalog/computeresource/ttypes.py
@@ -1216,7 +1216,7 @@ class SSHJobSubmission:
    - alternativeSSHHostName
    - sshPort
    - monitorMode
-   - emailMonitor
+   - emailMonitorProperty
   """
 
   thrift_spec = (
@@ -1227,17 +1227,17 @@ class SSHJobSubmission:
     (4, TType.STRING, 'alternativeSSHHostName', None, None, ), # 4
     (5, TType.I32, 'sshPort', None, 22, ), # 5
     (6, TType.I32, 'monitorMode', None, None, ), # 6
-    (7, TType.STRUCT, 'emailMonitor', (EmailMonitorProperty, EmailMonitorProperty.thrift_spec), None, ), # 7
+    (7, TType.STRUCT, 'emailMonitorProperty', (EmailMonitorProperty, EmailMonitorProperty.thrift_spec), None, ), # 7
   )
 
-  def __init__(self, jobSubmissionInterfaceId=thrift_spec[1][4], securityProtocol=None, resourceJobManager=None, alternativeSSHHostName=None, sshPort=thrift_spec[5][4], monitorMode=None, emailMonitor=None,):
+  def __init__(self, jobSubmissionInterfaceId=thrift_spec[1][4], securityProtocol=None, resourceJobManager=None, alternativeSSHHostName=None, sshPort=thrift_spec[5][4], monitorMode=None, emailMonitorProperty=None,):
     self.jobSubmissionInterfaceId = jobSubmissionInterfaceId
     self.securityProtocol = securityProtocol
     self.resourceJobManager = resourceJobManager
     self.alternativeSSHHostName = alternativeSSHHostName
     self.sshPort = sshPort
     self.monitorMode = monitorMode
-    self.emailMonitor = emailMonitor
+    self.emailMonitorProperty = emailMonitorProperty
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -1281,8 +1281,8 @@ class SSHJobSubmission:
           iprot.skip(ftype)
       elif fid == 7:
         if ftype == TType.STRUCT:
-          self.emailMonitor = EmailMonitorProperty()
-          self.emailMonitor.read(iprot)
+          self.emailMonitorProperty = EmailMonitorProperty()
+          self.emailMonitorProperty.read(iprot)
         else:
           iprot.skip(ftype)
       else:
@@ -1319,9 +1319,9 @@ class SSHJobSubmission:
       oprot.writeFieldBegin('monitorMode', TType.I32, 6)
       oprot.writeI32(self.monitorMode)
       oprot.writeFieldEnd()
-    if self.emailMonitor is not None:
-      oprot.writeFieldBegin('emailMonitor', TType.STRUCT, 7)
-      self.emailMonitor.write(oprot)
+    if self.emailMonitorProperty is not None:
+      oprot.writeFieldBegin('emailMonitorProperty', TType.STRUCT, 7)
+      self.emailMonitorProperty.write(oprot)
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()

http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index d173a0b..9ad71f4 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -61,7 +61,7 @@ public class CreateLaunchExperiment {
     private static String echoAppId = "Echo_fcac7076-e350-4dfb-a6eb-73e2d648fc60";
     private static String mpiAppId = "HelloMPI_bfd56d58-6085-4b7f-89fc-646576830518";
     private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762";
-    private static String amberAppId = "Amber_aa083c86-4680-4002-b3ef-fad93c181926";
+    private static String amberAppId = "Amber_717cba99-1085-45de-861c-952001c5243c";
     private static String gromacsAppId = "GROMACS_05622038-9edd-4cb1-824e-0b7cb993364b";
     private static String espressoAppId = "ESPRESSO_10cc2820-5d0b-4c63-9546-8a8b595593c1";
     private static String lammpsAppId = "LAMMPS_2472685b-8acf-497e-aafe-cc66fe5f4cb6";
@@ -168,13 +168,13 @@ public class CreateLaunchExperiment {
 //                final String expId = createMPIExperimentForFSD(airavataClient);
 //               final String expId = createEchoExperimentForStampede(airavataClient);
 //                final String expId = createEchoExperimentForTrestles(airavataClient);
-                final String expId = createExperimentEchoForLocalHost(airavataClient);
-                experimentIds.add(expId);
+//                final String expId = createExperimentEchoForLocalHost(airavataClient);
 //                final String expId = createExperimentWRFTrestles(airavataClient);
 //                final String expId = createExperimentForBR2(airavataClient);
 //                final String expId = createExperimentForBR2Amber(airavataClient);
 //                final String expId = createExperimentWRFStampede(airavataClient);
-//                final String expId = createExperimentForStampedeAmber(airavataClient);
+                final String expId = createExperimentForStampedeAmber(airavataClient);
+//                String expId = createExperimentForTrestlesAmber(airavataClient);
 //                final String expId = createExperimentGROMACSStampede(airavataClient);
 //                final String expId = createExperimentESPRESSOStampede(airavataClient);
 //                final String expId = createExperimentLAMMPSStampede(airavataClient);
@@ -184,6 +184,7 @@ public class CreateLaunchExperiment {
 //                final String expId = createExperimentForLSF(airavataClient);
 //                final String expId = createExperimentLAMMPSForLSF(airavataClient);
 //            	  final String expId = "Ultrascan_ln_eb029947-391a-4ccf-8ace-9bafebe07cc0";
+                experimentIds.add(expId);
                 System.out.println("Experiment ID : " + expId);
 //                updateExperiment(airavata, expId);
 
@@ -1312,11 +1313,11 @@ public class CreateLaunchExperiment {
 //			}
             for (InputDataObjectType inputDataObjectType : exInputs) {
                 if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) {
-                    inputDataObjectType.setValue("file://root@test-drive.airavata.org:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/02_Heat.rst");
+                    inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/02_Heat.rst");
                 } else if (inputDataObjectType.getName().equalsIgnoreCase("Production_Control_File")) {
-                    inputDataObjectType.setValue("file://root@test-drive.airavata.org:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/03_Prod.in");
+                    inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/03_Prod.in");
                 } else if (inputDataObjectType.getName().equalsIgnoreCase("Parameter_Topology_File")) {
-                    inputDataObjectType.setValue("file://root@test-drive.airavata.org:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/prmtop");
+                    inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/prmtop");
                 }
             }
 
@@ -1377,11 +1378,11 @@ public class CreateLaunchExperiment {
 //			}
             for (InputDataObjectType inputDataObjectType : exInputs) {
                 if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) {
-                    inputDataObjectType.setValue("/Users/chathuri/dev/airavata/source/php/inputs/AMBER_FILES/02_Heat.rst");
+                    inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/02_Heat.rst");
                 } else if (inputDataObjectType.getName().equalsIgnoreCase("Production_Control_File")) {
-                    inputDataObjectType.setValue("/Users/chathuri/dev/airavata/source/php/inputs/AMBER_FILES/03_Prod.in");
+                    inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/03_Prod.in");
                 } else if (inputDataObjectType.getName().equalsIgnoreCase("Parameter_Topology_File")) {
-                    inputDataObjectType.setValue("/Users/chathuri/dev/airavata/source/php/inputs/AMBER_FILES/prmtop");
+                    inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/prmtop");
                 }
             }
             List<OutputDataObjectType> exOut = client.getApplicationOutputs(amberAppId);

http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/appcatalog/computeresource/SSHJobSubmission.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/appcatalog/computeresource/SSHJobSubmission.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/appcatalog/computeresource/SSHJobSubmission.java
index a9bf275..66f33ab 100644
--- a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/appcatalog/computeresource/SSHJobSubmission.java
+++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/appcatalog/computeresource/SSHJobSubmission.java
@@ -67,7 +67,7 @@ import org.slf4j.LoggerFactory;
   private static final org.apache.thrift.protocol.TField ALTERNATIVE_SSHHOST_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("alternativeSSHHostName", org.apache.thrift.protocol.TType.STRING, (short)4);
   private static final org.apache.thrift.protocol.TField SSH_PORT_FIELD_DESC = new org.apache.thrift.protocol.TField("sshPort", org.apache.thrift.protocol.TType.I32, (short)5);
   private static final org.apache.thrift.protocol.TField MONITOR_MODE_FIELD_DESC = new org.apache.thrift.protocol.TField("monitorMode", org.apache.thrift.protocol.TType.I32, (short)6);
-  private static final org.apache.thrift.protocol.TField EMAIL_MONITOR_FIELD_DESC = new org.apache.thrift.protocol.TField("emailMonitor", org.apache.thrift.protocol.TType.STRUCT, (short)7);
+  private static final org.apache.thrift.protocol.TField EMAIL_MONITOR_PROPERTY_FIELD_DESC = new org.apache.thrift.protocol.TField("emailMonitorProperty", org.apache.thrift.protocol.TType.STRUCT, (short)7);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -81,7 +81,7 @@ import org.slf4j.LoggerFactory;
   private String alternativeSSHHostName; // optional
   private int sshPort; // optional
   private MonitorMode monitorMode; // optional
-  private EmailMonitorProperty emailMonitor; // optional
+  private EmailMonitorProperty emailMonitorProperty; // optional
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -99,7 +99,7 @@ import org.slf4j.LoggerFactory;
      * @see MonitorMode
      */
     MONITOR_MODE((short)6, "monitorMode"),
-    EMAIL_MONITOR((short)7, "emailMonitor");
+    EMAIL_MONITOR_PROPERTY((short)7, "emailMonitorProperty");
 
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -126,8 +126,8 @@ import org.slf4j.LoggerFactory;
           return SSH_PORT;
         case 6: // MONITOR_MODE
           return MONITOR_MODE;
-        case 7: // EMAIL_MONITOR
-          return EMAIL_MONITOR;
+        case 7: // EMAIL_MONITOR_PROPERTY
+          return EMAIL_MONITOR_PROPERTY;
         default:
           return null;
       }
@@ -170,7 +170,7 @@ import org.slf4j.LoggerFactory;
   // isset id assignments
   private static final int __SSHPORT_ISSET_ID = 0;
   private byte __isset_bitfield = 0;
-  private _Fields optionals[] = {_Fields.ALTERNATIVE_SSHHOST_NAME,_Fields.SSH_PORT,_Fields.MONITOR_MODE,_Fields.EMAIL_MONITOR};
+  private _Fields optionals[] = {_Fields.ALTERNATIVE_SSHHOST_NAME,_Fields.SSH_PORT,_Fields.MONITOR_MODE,_Fields.EMAIL_MONITOR_PROPERTY};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -186,7 +186,7 @@ import org.slf4j.LoggerFactory;
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
     tmpMap.put(_Fields.MONITOR_MODE, new org.apache.thrift.meta_data.FieldMetaData("monitorMode", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, MonitorMode.class)));
-    tmpMap.put(_Fields.EMAIL_MONITOR, new org.apache.thrift.meta_data.FieldMetaData("emailMonitor", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+    tmpMap.put(_Fields.EMAIL_MONITOR_PROPERTY, new org.apache.thrift.meta_data.FieldMetaData("emailMonitorProperty", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, EmailMonitorProperty.class)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(SSHJobSubmission.class, metaDataMap);
@@ -231,8 +231,8 @@ import org.slf4j.LoggerFactory;
     if (other.isSetMonitorMode()) {
       this.monitorMode = other.monitorMode;
     }
-    if (other.isSetEmailMonitor()) {
-      this.emailMonitor = new EmailMonitorProperty(other.emailMonitor);
+    if (other.isSetEmailMonitorProperty()) {
+      this.emailMonitorProperty = new EmailMonitorProperty(other.emailMonitorProperty);
     }
   }
 
@@ -250,7 +250,7 @@ import org.slf4j.LoggerFactory;
     this.sshPort = 22;
 
     this.monitorMode = null;
-    this.emailMonitor = null;
+    this.emailMonitorProperty = null;
   }
 
   public String getJobSubmissionInterfaceId() {
@@ -406,26 +406,26 @@ import org.slf4j.LoggerFactory;
     }
   }
 
-  public EmailMonitorProperty getEmailMonitor() {
-    return this.emailMonitor;
+  public EmailMonitorProperty getEmailMonitorProperty() {
+    return this.emailMonitorProperty;
   }
 
-  public void setEmailMonitor(EmailMonitorProperty emailMonitor) {
-    this.emailMonitor = emailMonitor;
+  public void setEmailMonitorProperty(EmailMonitorProperty emailMonitorProperty) {
+    this.emailMonitorProperty = emailMonitorProperty;
   }
 
-  public void unsetEmailMonitor() {
-    this.emailMonitor = null;
+  public void unsetEmailMonitorProperty() {
+    this.emailMonitorProperty = null;
   }
 
-  /** Returns true if field emailMonitor is set (has been assigned a value) and false otherwise */
-  public boolean isSetEmailMonitor() {
-    return this.emailMonitor != null;
+  /** Returns true if field emailMonitorProperty is set (has been assigned a value) and false otherwise */
+  public boolean isSetEmailMonitorProperty() {
+    return this.emailMonitorProperty != null;
   }
 
-  public void setEmailMonitorIsSet(boolean value) {
+  public void setEmailMonitorPropertyIsSet(boolean value) {
     if (!value) {
-      this.emailMonitor = null;
+      this.emailMonitorProperty = null;
     }
   }
 
@@ -479,11 +479,11 @@ import org.slf4j.LoggerFactory;
       }
       break;
 
-    case EMAIL_MONITOR:
+    case EMAIL_MONITOR_PROPERTY:
       if (value == null) {
-        unsetEmailMonitor();
+        unsetEmailMonitorProperty();
       } else {
-        setEmailMonitor((EmailMonitorProperty)value);
+        setEmailMonitorProperty((EmailMonitorProperty)value);
       }
       break;
 
@@ -510,8 +510,8 @@ import org.slf4j.LoggerFactory;
     case MONITOR_MODE:
       return getMonitorMode();
 
-    case EMAIL_MONITOR:
-      return getEmailMonitor();
+    case EMAIL_MONITOR_PROPERTY:
+      return getEmailMonitorProperty();
 
     }
     throw new IllegalStateException();
@@ -536,8 +536,8 @@ import org.slf4j.LoggerFactory;
       return isSetSshPort();
     case MONITOR_MODE:
       return isSetMonitorMode();
-    case EMAIL_MONITOR:
-      return isSetEmailMonitor();
+    case EMAIL_MONITOR_PROPERTY:
+      return isSetEmailMonitorProperty();
     }
     throw new IllegalStateException();
   }
@@ -609,12 +609,12 @@ import org.slf4j.LoggerFactory;
         return false;
     }
 
-    boolean this_present_emailMonitor = true && this.isSetEmailMonitor();
-    boolean that_present_emailMonitor = true && that.isSetEmailMonitor();
-    if (this_present_emailMonitor || that_present_emailMonitor) {
-      if (!(this_present_emailMonitor && that_present_emailMonitor))
+    boolean this_present_emailMonitorProperty = true && this.isSetEmailMonitorProperty();
+    boolean that_present_emailMonitorProperty = true && that.isSetEmailMonitorProperty();
+    if (this_present_emailMonitorProperty || that_present_emailMonitorProperty) {
+      if (!(this_present_emailMonitorProperty && that_present_emailMonitorProperty))
         return false;
-      if (!this.emailMonitor.equals(that.emailMonitor))
+      if (!this.emailMonitorProperty.equals(that.emailMonitorProperty))
         return false;
     }
 
@@ -694,12 +694,12 @@ import org.slf4j.LoggerFactory;
         return lastComparison;
       }
     }
-    lastComparison = Boolean.valueOf(isSetEmailMonitor()).compareTo(other.isSetEmailMonitor());
+    lastComparison = Boolean.valueOf(isSetEmailMonitorProperty()).compareTo(other.isSetEmailMonitorProperty());
     if (lastComparison != 0) {
       return lastComparison;
     }
-    if (isSetEmailMonitor()) {
-      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.emailMonitor, other.emailMonitor);
+    if (isSetEmailMonitorProperty()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.emailMonitorProperty, other.emailMonitorProperty);
       if (lastComparison != 0) {
         return lastComparison;
       }
@@ -773,13 +773,13 @@ import org.slf4j.LoggerFactory;
       }
       first = false;
     }
-    if (isSetEmailMonitor()) {
+    if (isSetEmailMonitorProperty()) {
       if (!first) sb.append(", ");
-      sb.append("emailMonitor:");
-      if (this.emailMonitor == null) {
+      sb.append("emailMonitorProperty:");
+      if (this.emailMonitorProperty == null) {
         sb.append("null");
       } else {
-        sb.append(this.emailMonitor);
+        sb.append(this.emailMonitorProperty);
       }
       first = false;
     }
@@ -805,8 +805,8 @@ import org.slf4j.LoggerFactory;
     if (resourceJobManager != null) {
       resourceJobManager.validate();
     }
-    if (emailMonitor != null) {
-      emailMonitor.validate();
+    if (emailMonitorProperty != null) {
+      emailMonitorProperty.validate();
     }
   }
 
@@ -895,11 +895,11 @@ import org.slf4j.LoggerFactory;
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
-          case 7: // EMAIL_MONITOR
+          case 7: // EMAIL_MONITOR_PROPERTY
             if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
-              struct.emailMonitor = new EmailMonitorProperty();
-              struct.emailMonitor.read(iprot);
-              struct.setEmailMonitorIsSet(true);
+              struct.emailMonitorProperty = new EmailMonitorProperty();
+              struct.emailMonitorProperty.read(iprot);
+              struct.setEmailMonitorPropertyIsSet(true);
             } else { 
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
@@ -951,10 +951,10 @@ import org.slf4j.LoggerFactory;
           oprot.writeFieldEnd();
         }
       }
-      if (struct.emailMonitor != null) {
-        if (struct.isSetEmailMonitor()) {
-          oprot.writeFieldBegin(EMAIL_MONITOR_FIELD_DESC);
-          struct.emailMonitor.write(oprot);
+      if (struct.emailMonitorProperty != null) {
+        if (struct.isSetEmailMonitorProperty()) {
+          oprot.writeFieldBegin(EMAIL_MONITOR_PROPERTY_FIELD_DESC);
+          struct.emailMonitorProperty.write(oprot);
           oprot.writeFieldEnd();
         }
       }
@@ -988,7 +988,7 @@ import org.slf4j.LoggerFactory;
       if (struct.isSetMonitorMode()) {
         optionals.set(2);
       }
-      if (struct.isSetEmailMonitor()) {
+      if (struct.isSetEmailMonitorProperty()) {
         optionals.set(3);
       }
       oprot.writeBitSet(optionals, 4);
@@ -1001,8 +1001,8 @@ import org.slf4j.LoggerFactory;
       if (struct.isSetMonitorMode()) {
         oprot.writeI32(struct.monitorMode.getValue());
       }
-      if (struct.isSetEmailMonitor()) {
-        struct.emailMonitor.write(oprot);
+      if (struct.isSetEmailMonitorProperty()) {
+        struct.emailMonitorProperty.write(oprot);
       }
     }
 
@@ -1030,9 +1030,9 @@ import org.slf4j.LoggerFactory;
         struct.setMonitorModeIsSet(true);
       }
       if (incoming.get(3)) {
-        struct.emailMonitor = new EmailMonitorProperty();
-        struct.emailMonitor.read(iprot);
-        struct.setEmailMonitorIsSet(true);
+        struct.emailMonitorProperty = new EmailMonitorProperty();
+        struct.emailMonitorProperty.read(iprot);
+        struct.setEmailMonitorPropertyIsSet(true);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/airavata-api/thrift-interface-descriptions/computeResourceModel.thrift
----------------------------------------------------------------------
diff --git a/airavata-api/thrift-interface-descriptions/computeResourceModel.thrift b/airavata-api/thrift-interface-descriptions/computeResourceModel.thrift
index cf9b06c..b13b2e5 100644
--- a/airavata-api/thrift-interface-descriptions/computeResourceModel.thrift
+++ b/airavata-api/thrift-interface-descriptions/computeResourceModel.thrift
@@ -348,7 +348,7 @@ struct SSHJobSubmission {
     4: optional string alternativeSSHHostName,
     5: optional i32 sshPort = 22,
     6: optional MonitorMode monitorMode,
-    7: optional EmailMonitorProperty emailMonitor
+    7: optional EmailMonitorProperty emailMonitorProperty
 }
 
 struct GlobusJobSubmission {

http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
index 2b23596..075f942 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -21,6 +21,7 @@
 package org.apache.airavata.gfac.gsissh.provider.impl;
 
 import org.airavata.appcatalog.cpi.AppCatalog;
+import org.airavata.appcatalog.cpi.AppCatalogException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gfac.ExecutionMode;
@@ -36,11 +37,14 @@ import org.apache.airavata.gfac.core.utils.GFacUtils;
 import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
 import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils;
 import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
+import org.apache.airavata.gfac.monitor.email.EmailMonitorFactory;
 import org.apache.airavata.gsi.ssh.api.Cluster;
 import org.apache.airavata.gsi.ssh.api.SSHApiException;
 import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
 import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
 import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.EmailMonitorProperty;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
 import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
 import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
 import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
@@ -138,48 +142,53 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
           
     }
 
-    public void delegateToMonitorHandlers(JobExecutionContext jobExecutionContext, SSHJobSubmission sshJobSubmission, String jobID) throws GFacHandlerException {
-        if (ServerSettings.isEmailBasedNotificationEnable()) {
-            try {
-                EmailBasedMonitor emailBasedMonitor = EmailBasedMonitor.getInstant(BetterGfacImpl.getMonitorPublisher());
-                emailBasedMonitor.addToJobMonitorMap(jobExecutionContext);
-            } catch (ApplicationSettingsException e) {
-                throw new GFacHandlerException("Error while delegating job execution context to email based monitor");
-            }
-        } else {
-            List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
-            if (daemonHandlers == null) {
-                daemonHandlers = BetterGfacImpl.getDaemonHandlers();
-            }
-            ThreadedHandler pullMonitorHandler = null;
-            ThreadedHandler pushMonitorHandler = null;
-            MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
-            for (ThreadedHandler threadedHandler : daemonHandlers) {
-                if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
-                    pullMonitorHandler = threadedHandler;
-                    if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) {
-                        log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned:  " + jobID);
-                        pullMonitorHandler.invoke(jobExecutionContext);
-                    } else {
-                        log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" +
-                                " to handle by the GridPullMonitorHandler");
-                    }
-                } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
-                    pushMonitorHandler = threadedHandler;
-                    if (monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) {
-                        log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned:  " + jobID);
-                        pushMonitorHandler.invoke(jobExecutionContext);
-                    } else {
-                        log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
-                                " to handle by the GridPushMonitorHandler");
-                    }
+    public void delegateToMonitorHandlers(JobExecutionContext jobExecutionContext, SSHJobSubmission sshJobSubmission, String jobID) throws GFacHandlerException, AppCatalogException {
+        if (jobExecutionContext.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
+            if (sshJobSubmission.getMonitorMode() == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) {
+                EmailMonitorProperty emailMonitorProp = sshJobSubmission.getEmailMonitorProperty();
+                if (emailMonitorProp != null) {
+                    EmailMonitorFactory emailMonitorFactory = new EmailMonitorFactory();
+                    EmailBasedMonitor emailBasedMonitor = emailMonitorFactory.getEmailBasedMonitor(emailMonitorProp);
+                    emailBasedMonitor.addToJobMonitorMap(jobExecutionContext);
+                    return;
                 }
-                // have to handle the GridPushMonitorHandler logic
             }
-            if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
-                log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
-                        ", execution is configured as asynchronous, so Outhandler will not be invoked");
+        }
+
+        // if email monitor is not activeated or not configure we use pull or push monitor
+        List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+        if (daemonHandlers == null) {
+            daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+        }
+        ThreadedHandler pullMonitorHandler = null;
+        ThreadedHandler pushMonitorHandler = null;
+        MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
+        for (ThreadedHandler threadedHandler : daemonHandlers) {
+            if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
+                pullMonitorHandler = threadedHandler;
+                if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) {
+                    log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned:  " + jobID);
+                    pullMonitorHandler.invoke(jobExecutionContext);
+                } else {
+                    log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" +
+                            " to handle by the GridPullMonitorHandler");
+                }
+            } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
+                pushMonitorHandler = threadedHandler;
+                if (monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) {
+                    log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned:  " + jobID);
+                    pushMonitorHandler.invoke(jobExecutionContext);
+                } else {
+                    log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
+                            " to handle by the GridPushMonitorHandler");
+                }
             }
+            // have to handle the GridPushMonitorHandler logic
+        }
+        if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
+            log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
+                    ", execution is configured as asynchronous, so Outhandler will not be invoked");
+
         }
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index b6bfa6c..affe156 100644
--- a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -21,18 +21,19 @@
 package org.apache.airavata.gfac.monitor.email;
 
 import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.logger.AiravataLogger;
 import org.apache.airavata.common.logger.AiravataLoggerFactory;
-import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
 import org.apache.airavata.gfac.core.utils.GFacThreadPoolExecutor;
 import org.apache.airavata.gfac.core.utils.OutHandlerWorker;
 import org.apache.airavata.gfac.monitor.email.parser.EmailParser;
 import org.apache.airavata.gfac.monitor.email.parser.LonestarEmailParser;
 import org.apache.airavata.gfac.monitor.email.parser.PBSEmailParser;
 import org.apache.airavata.gfac.monitor.email.parser.SLURMEmailParser;
+import org.apache.airavata.model.appcatalog.computeresource.EmailMonitorProperty;
+import org.apache.airavata.model.appcatalog.computeresource.EmailProtocol;
 import org.apache.airavata.model.messaging.event.JobIdentifier;
 import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
 import org.apache.airavata.model.workspace.experiment.JobState;
@@ -59,39 +60,34 @@ public class EmailBasedMonitor implements Runnable{
     private static final String PBS_CONSULT_SDSC_EDU = "pbsconsult@sdsc.edu";
     private static final String SLURM_BATCH_STAMPEDE = "slurm@batch1.stampede.tacc.utexas.edu";
     private static final String LONESTAR_ADDRESS = "root@c312-206.ls4.tacc.utexas.edu";
-    private static EmailBasedMonitor emailBasedMonitor;
-    private final MonitorPublisher monitorPublisher;
+    private final EmailMonitorProperty emailMonitorProperty;
+    private boolean stopMonitoring = false;
 
     private Session session ;
     private Store store;
     private Folder emailFolder;
-    private String host, emailAddress, password, folderName, mailStoreProtocol;
+//    private String host, emailAddress, password, folderName, mailStoreProtocol;
     private Properties properties;
 
     private Map<String, JobExecutionContext> jobMonitorMap = new ConcurrentHashMap<String, JobExecutionContext>();
 
-    private EmailBasedMonitor(MonitorPublisher monitorPublisher) throws ApplicationSettingsException {
-        this.monitorPublisher = monitorPublisher;
+    public EmailBasedMonitor(EmailMonitorProperty emailMonitorProp) {
+        this.emailMonitorProperty = emailMonitorProp;
         init();
     }
 
-    private void init() throws ApplicationSettingsException {
-        host = ServerSettings.getEmailBasedMonitorHost();
-        emailAddress = ServerSettings.getEmailBasedMonitorAddress();
-        password = ServerSettings.getEmailBasedMonitorPassword();
-        folderName = ServerSettings.getEmailBasedMonitorFolderName();
-        mailStoreProtocol = ServerSettings.getEmailBasedMonitorStoreProtocol();
-
+    private void init() {
         properties = new Properties();
-        properties.put("mail.store.protocol", mailStoreProtocol);
+        properties.put("mail.store.protocol", emailMonitorProperty.getStoreProtocol());
 
     }
 
-    public static EmailBasedMonitor getInstant(MonitorPublisher monitorPublisher) throws ApplicationSettingsException {
+/*    public static EmailBasedMonitor getInstant(EmailMonitorProperty emailMonitorProp, MonitorPublisher monitorPublisher)
+            throws ApplicationSettingsException {
         if (emailBasedMonitor == null) {
             synchronized (EmailBasedMonitor.class) {
                 if (emailBasedMonitor == null) {
-                    emailBasedMonitor = new EmailBasedMonitor(monitorPublisher);
+                    emailBasedMonitor = new EmailBasedMonitor(emailMonitorProp);
                     Thread thread = new Thread(emailBasedMonitor);
                     thread.start();
                 }
@@ -99,7 +95,7 @@ public class EmailBasedMonitor implements Runnable{
         }
 
         return emailBasedMonitor;
-    }
+    }*/
 
     public void addToJobMonitorMap(JobExecutionContext jobExecutionContext) {
         addToJobMonitorMap(jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext);
@@ -134,14 +130,15 @@ public class EmailBasedMonitor implements Runnable{
     public void run() {
         try {
             session = Session.getDefaultInstance(properties);
-            store = session.getStore(mailStoreProtocol);
-            store.connect(host, emailAddress, password);
-            while (!ServerSettings.isStopAllThreads()) {
+            store = session.getStore(getProtocol(emailMonitorProperty.getStoreProtocol()));
+            store.connect(emailMonitorProperty.getHost(), emailMonitorProperty.getEmailAddress(),
+                    emailMonitorProperty.getPassword());
+            while (!(stopMonitoring || ServerSettings.isStopAllThreads())) {
                 if (!store.isConnected()) {
                     store.connect();
                 }
                 Thread.sleep(2000);
-                emailFolder = store.getFolder(folderName);
+                emailFolder = store.getFolder(emailMonitorProperty.getFolderName());
                 emailFolder.open(Folder.READ_WRITE);
                 Message[] searchMessages = emailFolder.search(new FlagTerm(new Flags(Flags.Flag.SEEN), false));
                 List<Message> processedMessages = new ArrayList<>();
@@ -177,6 +174,8 @@ public class EmailBasedMonitor implements Runnable{
             log.error("Couldn't connect to the store ", e);
         } catch (InterruptedException e) {
             log.error("Interrupt exception while sleep ", e);
+        } catch (AiravataException e) {
+            log.error("UnHandled arguments ", e);
         } finally {
             try {
                 store.close();
@@ -186,11 +185,21 @@ public class EmailBasedMonitor implements Runnable{
         }
     }
 
+    private String getProtocol(EmailProtocol storeProtocol) throws AiravataException {
+        switch (storeProtocol) {
+            case IMAPS:
+                return "imaps";
+            case POP3:
+                return "pop3";
+            default:
+                throw new AiravataException("Unhandled Email store protocol ");
+        }
+    }
     private void process(JobStatusResult jobStatusResult, JobExecutionContext jEC){
         JobState resultState = jobStatusResult.getState();
         jEC.getJobDetails().setJobStatus(new JobStatus(resultState));
         if (resultState == JobState.COMPLETE) {
-            GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(jEC, monitorPublisher));
+            GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(jEC, BetterGfacImpl.getMonitorPublisher()));
         }else if (resultState == JobState.QUEUED) {
             // TODO - publish queued rabbitmq message
         }else if (resultState == JobState.FAILED) {
@@ -216,7 +225,7 @@ public class EmailBasedMonitor implements Runnable{
                         "experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(),
                 jobStatus.getJobIdentity().getTaskId());
 
-        monitorPublisher.publish(jobStatus);
+        BetterGfacImpl.getMonitorPublisher().publish(jobStatus);
     }
 
     private void writeEnvelopeOnError(Message m) throws MessagingException {
@@ -235,4 +244,8 @@ public class EmailBasedMonitor implements Runnable{
         if (m.getSubject() != null)
             log.error("SUBJECT: " + m.getSubject());
     }
+
+    public void stopMonitoring() {
+        stopMonitoring = true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailMonitorFactory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailMonitorFactory.java b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailMonitorFactory.java
new file mode 100644
index 0000000..2c24973
--- /dev/null
+++ b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailMonitorFactory.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.email;
+
+import org.apache.airavata.model.appcatalog.computeresource.EmailMonitorProperty;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class EmailMonitorFactory {
+
+    private Map<String, EmailBasedMonitor> emailMonitors = new HashMap<String, EmailBasedMonitor>();
+
+
+    public synchronized EmailBasedMonitor getEmailBasedMonitor(EmailMonitorProperty emailMonitorProp) {
+        String key = getKey(emailMonitorProp);
+        EmailBasedMonitor monitor = emailMonitors.get(key);
+        if (monitor == null) {
+            monitor = new EmailBasedMonitor(emailMonitorProp);
+            emailMonitors.put(key, monitor);
+            new Thread(monitor).start();
+        }
+        return monitor;
+    }
+
+    public void stopAllMonitors() {
+        for (EmailBasedMonitor emailBasedMonitor : emailMonitors.values()) {
+            emailBasedMonitor.stopMonitoring();
+        }
+    }
+
+    private String getKey(EmailMonitorProperty emailMonitorProp) {
+        StringBuffer sb = new StringBuffer(emailMonitorProp.getHost().trim());
+        sb.append("_").append(emailMonitorProp.getStoreProtocol().name());
+        sb.append("_").append(emailMonitorProp.getEmailAddress().trim());
+        sb.append("_").append(emailMonitorProp.getFolderName().trim());
+        return sb.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index b201c79..7c29352 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -21,6 +21,7 @@
 
 package org.apache.airavata.gfac.ssh.provider.impl;
 
+import org.airavata.appcatalog.cpi.AppCatalogException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gfac.Constants;
@@ -36,6 +37,7 @@ import org.apache.airavata.gfac.core.provider.AbstractProvider;
 import org.apache.airavata.gfac.core.provider.GFacProviderException;
 import org.apache.airavata.gfac.core.utils.GFacUtils;
 import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
+import org.apache.airavata.gfac.monitor.email.EmailMonitorFactory;
 import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
 import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
 import org.apache.airavata.gsi.ssh.api.Cluster;
@@ -47,9 +49,12 @@ import org.apache.airavata.gsi.ssh.impl.StandardOutReader;
 import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths;
 import org.apache.airavata.model.appcatalog.appinterface.DataType;
 import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.appcatalog.computeresource.EmailMonitorProperty;
 import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
 import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
 import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
 import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
 import org.apache.airavata.model.workspace.experiment.ErrorCategory;
 import org.apache.airavata.model.workspace.experiment.JobDetails;
@@ -369,33 +374,40 @@ public class SSHProvider extends AbstractProvider {
         return stdOutputString;
     }
 
-    public void delegateToMonitorHandlers(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
-        if (ServerSettings.isEmailBasedNotificationEnable()) {
-            try {
-                EmailBasedMonitor emailBasedMonitor = EmailBasedMonitor.getInstant(BetterGfacImpl.getMonitorPublisher());
-                emailBasedMonitor.addToJobMonitorMap(jobExecutionContext);
-            } catch (ApplicationSettingsException e) {
-                throw new GFacHandlerException("Error while delegating job execution context to email based monitor");
-            }
-        } else {
-            List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
-            if (daemonHandlers == null) {
-                daemonHandlers = BetterGfacImpl.getDaemonHandlers();
-            }
-            ThreadedHandler pullMonitorHandler = null;
-            ThreadedHandler pushMonitorHandler = null;
-            for (ThreadedHandler threadedHandler : daemonHandlers) {
-                if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
-                    pullMonitorHandler = threadedHandler;
-                    pullMonitorHandler.invoke(jobExecutionContext);
+    public void delegateToMonitorHandlers(JobExecutionContext jobExecutionContext) throws GFacHandlerException, AppCatalogException {
+        if (jobExecutionContext.getPreferredJobSubmissionProtocol()== JobSubmissionProtocol.SSH) {
+            String jobSubmissionInterfaceId = jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId();
+            SSHJobSubmission sshJobSubmission = jobExecutionContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId);
+            if (sshJobSubmission.getMonitorMode() == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) {
+                EmailMonitorProperty emailMonitorProp = sshJobSubmission.getEmailMonitorProperty();
+                if (emailMonitorProp != null) {
+                    EmailMonitorFactory emailMonitorFactory = new EmailMonitorFactory();
+                    EmailBasedMonitor emailBasedMonitor = emailMonitorFactory.getEmailBasedMonitor(emailMonitorProp);
+                    emailBasedMonitor.addToJobMonitorMap(jobExecutionContext);
+                    return;
                 }
-                // have to handle the GridPushMonitorHandler logic
             }
-            if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
-                log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
-                        ", execution is configured as asynchronous, so Outhandler will not be invoked");
+        }
+
+        // if email monitor is not activeated or not configure we use pull or push monitor
+        List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+        if (daemonHandlers == null) {
+            daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+        }
+        ThreadedHandler pullMonitorHandler = null;
+        ThreadedHandler pushMonitorHandler = null;
+        for (ThreadedHandler threadedHandler : daemonHandlers) {
+            if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
+                pullMonitorHandler = threadedHandler;
+                pullMonitorHandler.invoke(jobExecutionContext);
             }
+            // have to handle the GridPushMonitorHandler logic
         }
+        if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
+            log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
+                    ", execution is configured as asynchronous, so Outhandler will not be invoked");
+        }
+
     }
 
 }