You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/04/10 22:33:58 UTC
[4/4] airavata git commit: AIRAVATA-1666 - Added compute resource
level email based monitoring check.
AIRAVATA-1666 - Added compute resource level email based monitoring check.
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/93d4421b
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/93d4421b
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/93d4421b
Branch: refs/heads/master
Commit: 93d4421b7e5c0d32fc62ceae3d0d0c88cdaf7838
Parents: 042e25c
Author: shamrath <sh...@gmail.com>
Authored: Fri Apr 10 11:50:20 2015 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Fri Apr 10 11:50:20 2015 -0400
----------------------------------------------------------------------
.../lib/airavata/computeResourceModel_types.cpp | 12 +-
.../lib/airavata/computeResourceModel_types.h | 16 +--
.../Model/AppCatalog/ComputeResource/Types.php | 20 ++--
.../model/appcatalog/computeresource/ttypes.py | 18 +--
.../client/samples/CreateLaunchExperiment.java | 21 ++--
.../computeresource/SSHJobSubmission.java | 112 +++++++++----------
.../computeResourceModel.thrift | 2 +-
.../gsissh/provider/impl/GSISSHProvider.java | 87 +++++++-------
.../gfac/monitor/email/EmailBasedMonitor.java | 61 ++++++----
.../gfac/monitor/email/EmailMonitorFactory.java | 58 ++++++++++
.../gfac/ssh/provider/impl/SSHProvider.java | 58 ++++++----
11 files changed, 279 insertions(+), 186 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.cpp
index 8b83573..2a57b7d 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.cpp
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.cpp
@@ -1142,8 +1142,8 @@ uint32_t SSHJobSubmission::read(::apache::thrift::protocol::TProtocol* iprot) {
break;
case 7:
if (ftype == ::apache::thrift::protocol::T_STRUCT) {
- xfer += this->emailMonitor.read(iprot);
- this->__isset.emailMonitor = true;
+ xfer += this->emailMonitorProperty.read(iprot);
+ this->__isset.emailMonitorProperty = true;
} else {
xfer += iprot->skip(ftype);
}
@@ -1197,9 +1197,9 @@ uint32_t SSHJobSubmission::write(::apache::thrift::protocol::TProtocol* oprot) c
xfer += oprot->writeI32((int32_t)this->monitorMode);
xfer += oprot->writeFieldEnd();
}
- if (this->__isset.emailMonitor) {
- xfer += oprot->writeFieldBegin("emailMonitor", ::apache::thrift::protocol::T_STRUCT, 7);
- xfer += this->emailMonitor.write(oprot);
+ if (this->__isset.emailMonitorProperty) {
+ xfer += oprot->writeFieldBegin("emailMonitorProperty", ::apache::thrift::protocol::T_STRUCT, 7);
+ xfer += this->emailMonitorProperty.write(oprot);
xfer += oprot->writeFieldEnd();
}
xfer += oprot->writeFieldStop();
@@ -1215,7 +1215,7 @@ void swap(SSHJobSubmission &a, SSHJobSubmission &b) {
swap(a.alternativeSSHHostName, b.alternativeSSHHostName);
swap(a.sshPort, b.sshPort);
swap(a.monitorMode, b.monitorMode);
- swap(a.emailMonitor, b.emailMonitor);
+ swap(a.emailMonitorProperty, b.emailMonitorProperty);
swap(a.__isset, b.__isset);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.h
index 05a2dfd..2b72a04 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.h
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.h
@@ -647,11 +647,11 @@ class LOCALDataMovement {
void swap(LOCALDataMovement &a, LOCALDataMovement &b);
typedef struct _SSHJobSubmission__isset {
- _SSHJobSubmission__isset() : alternativeSSHHostName(false), sshPort(true), monitorMode(false), emailMonitor(false) {}
+ _SSHJobSubmission__isset() : alternativeSSHHostName(false), sshPort(true), monitorMode(false), emailMonitorProperty(false) {}
bool alternativeSSHHostName;
bool sshPort;
bool monitorMode;
- bool emailMonitor;
+ bool emailMonitorProperty;
} _SSHJobSubmission__isset;
class SSHJobSubmission {
@@ -671,7 +671,7 @@ class SSHJobSubmission {
std::string alternativeSSHHostName;
int32_t sshPort;
MonitorMode::type monitorMode;
- EmailMonitorProperty emailMonitor;
+ EmailMonitorProperty emailMonitorProperty;
_SSHJobSubmission__isset __isset;
@@ -702,9 +702,9 @@ class SSHJobSubmission {
__isset.monitorMode = true;
}
- void __set_emailMonitor(const EmailMonitorProperty& val) {
- emailMonitor = val;
- __isset.emailMonitor = true;
+ void __set_emailMonitorProperty(const EmailMonitorProperty& val) {
+ emailMonitorProperty = val;
+ __isset.emailMonitorProperty = true;
}
bool operator == (const SSHJobSubmission & rhs) const
@@ -727,9 +727,9 @@ class SSHJobSubmission {
return false;
else if (__isset.monitorMode && !(monitorMode == rhs.monitorMode))
return false;
- if (__isset.emailMonitor != rhs.__isset.emailMonitor)
+ if (__isset.emailMonitorProperty != rhs.__isset.emailMonitorProperty)
return false;
- else if (__isset.emailMonitor && !(emailMonitor == rhs.emailMonitor))
+ else if (__isset.emailMonitorProperty && !(emailMonitorProperty == rhs.emailMonitorProperty))
return false;
return true;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/AppCatalog/ComputeResource/Types.php
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/AppCatalog/ComputeResource/Types.php b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/AppCatalog/ComputeResource/Types.php
index e488087..22c46a9 100644
--- a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/AppCatalog/ComputeResource/Types.php
+++ b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/AppCatalog/ComputeResource/Types.php
@@ -1232,7 +1232,7 @@ class SSHJobSubmission {
public $alternativeSSHHostName = null;
public $sshPort = 22;
public $monitorMode = null;
- public $emailMonitor = null;
+ public $emailMonitorProperty = null;
public function __construct($vals=null) {
if (!isset(self::$_TSPEC)) {
@@ -1263,7 +1263,7 @@ class SSHJobSubmission {
'type' => TType::I32,
),
7 => array(
- 'var' => 'emailMonitor',
+ 'var' => 'emailMonitorProperty',
'type' => TType::STRUCT,
'class' => '\Airavata\Model\AppCatalog\ComputeResource\EmailMonitorProperty',
),
@@ -1288,8 +1288,8 @@ class SSHJobSubmission {
if (isset($vals['monitorMode'])) {
$this->monitorMode = $vals['monitorMode'];
}
- if (isset($vals['emailMonitor'])) {
- $this->emailMonitor = $vals['emailMonitor'];
+ if (isset($vals['emailMonitorProperty'])) {
+ $this->emailMonitorProperty = $vals['emailMonitorProperty'];
}
}
}
@@ -1358,8 +1358,8 @@ class SSHJobSubmission {
break;
case 7:
if ($ftype == TType::STRUCT) {
- $this->emailMonitor = new \Airavata\Model\AppCatalog\ComputeResource\EmailMonitorProperty();
- $xfer += $this->emailMonitor->read($input);
+ $this->emailMonitorProperty = new \Airavata\Model\AppCatalog\ComputeResource\EmailMonitorProperty();
+ $xfer += $this->emailMonitorProperty->read($input);
} else {
$xfer += $input->skip($ftype);
}
@@ -1410,12 +1410,12 @@ class SSHJobSubmission {
$xfer += $output->writeI32($this->monitorMode);
$xfer += $output->writeFieldEnd();
}
- if ($this->emailMonitor !== null) {
- if (!is_object($this->emailMonitor)) {
+ if ($this->emailMonitorProperty !== null) {
+ if (!is_object($this->emailMonitorProperty)) {
throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA);
}
- $xfer += $output->writeFieldBegin('emailMonitor', TType::STRUCT, 7);
- $xfer += $this->emailMonitor->write($output);
+ $xfer += $output->writeFieldBegin('emailMonitorProperty', TType::STRUCT, 7);
+ $xfer += $this->emailMonitorProperty->write($output);
$xfer += $output->writeFieldEnd();
}
$xfer += $output->writeFieldStop();
http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/appcatalog/computeresource/ttypes.py
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/appcatalog/computeresource/ttypes.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/appcatalog/computeresource/ttypes.py
index e635472..8b34be1 100644
--- a/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/appcatalog/computeresource/ttypes.py
+++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/appcatalog/computeresource/ttypes.py
@@ -1216,7 +1216,7 @@ class SSHJobSubmission:
- alternativeSSHHostName
- sshPort
- monitorMode
- - emailMonitor
+ - emailMonitorProperty
"""
thrift_spec = (
@@ -1227,17 +1227,17 @@ class SSHJobSubmission:
(4, TType.STRING, 'alternativeSSHHostName', None, None, ), # 4
(5, TType.I32, 'sshPort', None, 22, ), # 5
(6, TType.I32, 'monitorMode', None, None, ), # 6
- (7, TType.STRUCT, 'emailMonitor', (EmailMonitorProperty, EmailMonitorProperty.thrift_spec), None, ), # 7
+ (7, TType.STRUCT, 'emailMonitorProperty', (EmailMonitorProperty, EmailMonitorProperty.thrift_spec), None, ), # 7
)
- def __init__(self, jobSubmissionInterfaceId=thrift_spec[1][4], securityProtocol=None, resourceJobManager=None, alternativeSSHHostName=None, sshPort=thrift_spec[5][4], monitorMode=None, emailMonitor=None,):
+ def __init__(self, jobSubmissionInterfaceId=thrift_spec[1][4], securityProtocol=None, resourceJobManager=None, alternativeSSHHostName=None, sshPort=thrift_spec[5][4], monitorMode=None, emailMonitorProperty=None,):
self.jobSubmissionInterfaceId = jobSubmissionInterfaceId
self.securityProtocol = securityProtocol
self.resourceJobManager = resourceJobManager
self.alternativeSSHHostName = alternativeSSHHostName
self.sshPort = sshPort
self.monitorMode = monitorMode
- self.emailMonitor = emailMonitor
+ self.emailMonitorProperty = emailMonitorProperty
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -1281,8 +1281,8 @@ class SSHJobSubmission:
iprot.skip(ftype)
elif fid == 7:
if ftype == TType.STRUCT:
- self.emailMonitor = EmailMonitorProperty()
- self.emailMonitor.read(iprot)
+ self.emailMonitorProperty = EmailMonitorProperty()
+ self.emailMonitorProperty.read(iprot)
else:
iprot.skip(ftype)
else:
@@ -1319,9 +1319,9 @@ class SSHJobSubmission:
oprot.writeFieldBegin('monitorMode', TType.I32, 6)
oprot.writeI32(self.monitorMode)
oprot.writeFieldEnd()
- if self.emailMonitor is not None:
- oprot.writeFieldBegin('emailMonitor', TType.STRUCT, 7)
- self.emailMonitor.write(oprot)
+ if self.emailMonitorProperty is not None:
+ oprot.writeFieldBegin('emailMonitorProperty', TType.STRUCT, 7)
+ self.emailMonitorProperty.write(oprot)
oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index d173a0b..9ad71f4 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -61,7 +61,7 @@ public class CreateLaunchExperiment {
private static String echoAppId = "Echo_fcac7076-e350-4dfb-a6eb-73e2d648fc60";
private static String mpiAppId = "HelloMPI_bfd56d58-6085-4b7f-89fc-646576830518";
private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762";
- private static String amberAppId = "Amber_aa083c86-4680-4002-b3ef-fad93c181926";
+ private static String amberAppId = "Amber_717cba99-1085-45de-861c-952001c5243c";
private static String gromacsAppId = "GROMACS_05622038-9edd-4cb1-824e-0b7cb993364b";
private static String espressoAppId = "ESPRESSO_10cc2820-5d0b-4c63-9546-8a8b595593c1";
private static String lammpsAppId = "LAMMPS_2472685b-8acf-497e-aafe-cc66fe5f4cb6";
@@ -168,13 +168,13 @@ public class CreateLaunchExperiment {
// final String expId = createMPIExperimentForFSD(airavataClient);
// final String expId = createEchoExperimentForStampede(airavataClient);
// final String expId = createEchoExperimentForTrestles(airavataClient);
- final String expId = createExperimentEchoForLocalHost(airavataClient);
- experimentIds.add(expId);
+// final String expId = createExperimentEchoForLocalHost(airavataClient);
// final String expId = createExperimentWRFTrestles(airavataClient);
// final String expId = createExperimentForBR2(airavataClient);
// final String expId = createExperimentForBR2Amber(airavataClient);
// final String expId = createExperimentWRFStampede(airavataClient);
-// final String expId = createExperimentForStampedeAmber(airavataClient);
+ final String expId = createExperimentForStampedeAmber(airavataClient);
+// String expId = createExperimentForTrestlesAmber(airavataClient);
// final String expId = createExperimentGROMACSStampede(airavataClient);
// final String expId = createExperimentESPRESSOStampede(airavataClient);
// final String expId = createExperimentLAMMPSStampede(airavataClient);
@@ -184,6 +184,7 @@ public class CreateLaunchExperiment {
// final String expId = createExperimentForLSF(airavataClient);
// final String expId = createExperimentLAMMPSForLSF(airavataClient);
// final String expId = "Ultrascan_ln_eb029947-391a-4ccf-8ace-9bafebe07cc0";
+ experimentIds.add(expId);
System.out.println("Experiment ID : " + expId);
// updateExperiment(airavata, expId);
@@ -1312,11 +1313,11 @@ public class CreateLaunchExperiment {
// }
for (InputDataObjectType inputDataObjectType : exInputs) {
if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) {
- inputDataObjectType.setValue("file://root@test-drive.airavata.org:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/02_Heat.rst");
+ inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/02_Heat.rst");
} else if (inputDataObjectType.getName().equalsIgnoreCase("Production_Control_File")) {
- inputDataObjectType.setValue("file://root@test-drive.airavata.org:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/03_Prod.in");
+ inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/03_Prod.in");
} else if (inputDataObjectType.getName().equalsIgnoreCase("Parameter_Topology_File")) {
- inputDataObjectType.setValue("file://root@test-drive.airavata.org:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/prmtop");
+ inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/prmtop");
}
}
@@ -1377,11 +1378,11 @@ public class CreateLaunchExperiment {
// }
for (InputDataObjectType inputDataObjectType : exInputs) {
if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) {
- inputDataObjectType.setValue("/Users/chathuri/dev/airavata/source/php/inputs/AMBER_FILES/02_Heat.rst");
+ inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/02_Heat.rst");
} else if (inputDataObjectType.getName().equalsIgnoreCase("Production_Control_File")) {
- inputDataObjectType.setValue("/Users/chathuri/dev/airavata/source/php/inputs/AMBER_FILES/03_Prod.in");
+ inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/03_Prod.in");
} else if (inputDataObjectType.getName().equalsIgnoreCase("Parameter_Topology_File")) {
- inputDataObjectType.setValue("/Users/chathuri/dev/airavata/source/php/inputs/AMBER_FILES/prmtop");
+ inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/prmtop");
}
}
List<OutputDataObjectType> exOut = client.getApplicationOutputs(amberAppId);
http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/appcatalog/computeresource/SSHJobSubmission.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/appcatalog/computeresource/SSHJobSubmission.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/appcatalog/computeresource/SSHJobSubmission.java
index a9bf275..66f33ab 100644
--- a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/appcatalog/computeresource/SSHJobSubmission.java
+++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/appcatalog/computeresource/SSHJobSubmission.java
@@ -67,7 +67,7 @@ import org.slf4j.LoggerFactory;
private static final org.apache.thrift.protocol.TField ALTERNATIVE_SSHHOST_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("alternativeSSHHostName", org.apache.thrift.protocol.TType.STRING, (short)4);
private static final org.apache.thrift.protocol.TField SSH_PORT_FIELD_DESC = new org.apache.thrift.protocol.TField("sshPort", org.apache.thrift.protocol.TType.I32, (short)5);
private static final org.apache.thrift.protocol.TField MONITOR_MODE_FIELD_DESC = new org.apache.thrift.protocol.TField("monitorMode", org.apache.thrift.protocol.TType.I32, (short)6);
- private static final org.apache.thrift.protocol.TField EMAIL_MONITOR_FIELD_DESC = new org.apache.thrift.protocol.TField("emailMonitor", org.apache.thrift.protocol.TType.STRUCT, (short)7);
+ private static final org.apache.thrift.protocol.TField EMAIL_MONITOR_PROPERTY_FIELD_DESC = new org.apache.thrift.protocol.TField("emailMonitorProperty", org.apache.thrift.protocol.TType.STRUCT, (short)7);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -81,7 +81,7 @@ import org.slf4j.LoggerFactory;
private String alternativeSSHHostName; // optional
private int sshPort; // optional
private MonitorMode monitorMode; // optional
- private EmailMonitorProperty emailMonitor; // optional
+ private EmailMonitorProperty emailMonitorProperty; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
@SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -99,7 +99,7 @@ import org.slf4j.LoggerFactory;
* @see MonitorMode
*/
MONITOR_MODE((short)6, "monitorMode"),
- EMAIL_MONITOR((short)7, "emailMonitor");
+ EMAIL_MONITOR_PROPERTY((short)7, "emailMonitorProperty");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -126,8 +126,8 @@ import org.slf4j.LoggerFactory;
return SSH_PORT;
case 6: // MONITOR_MODE
return MONITOR_MODE;
- case 7: // EMAIL_MONITOR
- return EMAIL_MONITOR;
+ case 7: // EMAIL_MONITOR_PROPERTY
+ return EMAIL_MONITOR_PROPERTY;
default:
return null;
}
@@ -170,7 +170,7 @@ import org.slf4j.LoggerFactory;
// isset id assignments
private static final int __SSHPORT_ISSET_ID = 0;
private byte __isset_bitfield = 0;
- private _Fields optionals[] = {_Fields.ALTERNATIVE_SSHHOST_NAME,_Fields.SSH_PORT,_Fields.MONITOR_MODE,_Fields.EMAIL_MONITOR};
+ private _Fields optionals[] = {_Fields.ALTERNATIVE_SSHHOST_NAME,_Fields.SSH_PORT,_Fields.MONITOR_MODE,_Fields.EMAIL_MONITOR_PROPERTY};
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -186,7 +186,7 @@ import org.slf4j.LoggerFactory;
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
tmpMap.put(_Fields.MONITOR_MODE, new org.apache.thrift.meta_data.FieldMetaData("monitorMode", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, MonitorMode.class)));
- tmpMap.put(_Fields.EMAIL_MONITOR, new org.apache.thrift.meta_data.FieldMetaData("emailMonitor", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ tmpMap.put(_Fields.EMAIL_MONITOR_PROPERTY, new org.apache.thrift.meta_data.FieldMetaData("emailMonitorProperty", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, EmailMonitorProperty.class)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(SSHJobSubmission.class, metaDataMap);
@@ -231,8 +231,8 @@ import org.slf4j.LoggerFactory;
if (other.isSetMonitorMode()) {
this.monitorMode = other.monitorMode;
}
- if (other.isSetEmailMonitor()) {
- this.emailMonitor = new EmailMonitorProperty(other.emailMonitor);
+ if (other.isSetEmailMonitorProperty()) {
+ this.emailMonitorProperty = new EmailMonitorProperty(other.emailMonitorProperty);
}
}
@@ -250,7 +250,7 @@ import org.slf4j.LoggerFactory;
this.sshPort = 22;
this.monitorMode = null;
- this.emailMonitor = null;
+ this.emailMonitorProperty = null;
}
public String getJobSubmissionInterfaceId() {
@@ -406,26 +406,26 @@ import org.slf4j.LoggerFactory;
}
}
- public EmailMonitorProperty getEmailMonitor() {
- return this.emailMonitor;
+ public EmailMonitorProperty getEmailMonitorProperty() {
+ return this.emailMonitorProperty;
}
- public void setEmailMonitor(EmailMonitorProperty emailMonitor) {
- this.emailMonitor = emailMonitor;
+ public void setEmailMonitorProperty(EmailMonitorProperty emailMonitorProperty) {
+ this.emailMonitorProperty = emailMonitorProperty;
}
- public void unsetEmailMonitor() {
- this.emailMonitor = null;
+ public void unsetEmailMonitorProperty() {
+ this.emailMonitorProperty = null;
}
- /** Returns true if field emailMonitor is set (has been assigned a value) and false otherwise */
- public boolean isSetEmailMonitor() {
- return this.emailMonitor != null;
+ /** Returns true if field emailMonitorProperty is set (has been assigned a value) and false otherwise */
+ public boolean isSetEmailMonitorProperty() {
+ return this.emailMonitorProperty != null;
}
- public void setEmailMonitorIsSet(boolean value) {
+ public void setEmailMonitorPropertyIsSet(boolean value) {
if (!value) {
- this.emailMonitor = null;
+ this.emailMonitorProperty = null;
}
}
@@ -479,11 +479,11 @@ import org.slf4j.LoggerFactory;
}
break;
- case EMAIL_MONITOR:
+ case EMAIL_MONITOR_PROPERTY:
if (value == null) {
- unsetEmailMonitor();
+ unsetEmailMonitorProperty();
} else {
- setEmailMonitor((EmailMonitorProperty)value);
+ setEmailMonitorProperty((EmailMonitorProperty)value);
}
break;
@@ -510,8 +510,8 @@ import org.slf4j.LoggerFactory;
case MONITOR_MODE:
return getMonitorMode();
- case EMAIL_MONITOR:
- return getEmailMonitor();
+ case EMAIL_MONITOR_PROPERTY:
+ return getEmailMonitorProperty();
}
throw new IllegalStateException();
@@ -536,8 +536,8 @@ import org.slf4j.LoggerFactory;
return isSetSshPort();
case MONITOR_MODE:
return isSetMonitorMode();
- case EMAIL_MONITOR:
- return isSetEmailMonitor();
+ case EMAIL_MONITOR_PROPERTY:
+ return isSetEmailMonitorProperty();
}
throw new IllegalStateException();
}
@@ -609,12 +609,12 @@ import org.slf4j.LoggerFactory;
return false;
}
- boolean this_present_emailMonitor = true && this.isSetEmailMonitor();
- boolean that_present_emailMonitor = true && that.isSetEmailMonitor();
- if (this_present_emailMonitor || that_present_emailMonitor) {
- if (!(this_present_emailMonitor && that_present_emailMonitor))
+ boolean this_present_emailMonitorProperty = true && this.isSetEmailMonitorProperty();
+ boolean that_present_emailMonitorProperty = true && that.isSetEmailMonitorProperty();
+ if (this_present_emailMonitorProperty || that_present_emailMonitorProperty) {
+ if (!(this_present_emailMonitorProperty && that_present_emailMonitorProperty))
return false;
- if (!this.emailMonitor.equals(that.emailMonitor))
+ if (!this.emailMonitorProperty.equals(that.emailMonitorProperty))
return false;
}
@@ -694,12 +694,12 @@ import org.slf4j.LoggerFactory;
return lastComparison;
}
}
- lastComparison = Boolean.valueOf(isSetEmailMonitor()).compareTo(other.isSetEmailMonitor());
+ lastComparison = Boolean.valueOf(isSetEmailMonitorProperty()).compareTo(other.isSetEmailMonitorProperty());
if (lastComparison != 0) {
return lastComparison;
}
- if (isSetEmailMonitor()) {
- lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.emailMonitor, other.emailMonitor);
+ if (isSetEmailMonitorProperty()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.emailMonitorProperty, other.emailMonitorProperty);
if (lastComparison != 0) {
return lastComparison;
}
@@ -773,13 +773,13 @@ import org.slf4j.LoggerFactory;
}
first = false;
}
- if (isSetEmailMonitor()) {
+ if (isSetEmailMonitorProperty()) {
if (!first) sb.append(", ");
- sb.append("emailMonitor:");
- if (this.emailMonitor == null) {
+ sb.append("emailMonitorProperty:");
+ if (this.emailMonitorProperty == null) {
sb.append("null");
} else {
- sb.append(this.emailMonitor);
+ sb.append(this.emailMonitorProperty);
}
first = false;
}
@@ -805,8 +805,8 @@ import org.slf4j.LoggerFactory;
if (resourceJobManager != null) {
resourceJobManager.validate();
}
- if (emailMonitor != null) {
- emailMonitor.validate();
+ if (emailMonitorProperty != null) {
+ emailMonitorProperty.validate();
}
}
@@ -895,11 +895,11 @@ import org.slf4j.LoggerFactory;
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
- case 7: // EMAIL_MONITOR
+ case 7: // EMAIL_MONITOR_PROPERTY
if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
- struct.emailMonitor = new EmailMonitorProperty();
- struct.emailMonitor.read(iprot);
- struct.setEmailMonitorIsSet(true);
+ struct.emailMonitorProperty = new EmailMonitorProperty();
+ struct.emailMonitorProperty.read(iprot);
+ struct.setEmailMonitorPropertyIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -951,10 +951,10 @@ import org.slf4j.LoggerFactory;
oprot.writeFieldEnd();
}
}
- if (struct.emailMonitor != null) {
- if (struct.isSetEmailMonitor()) {
- oprot.writeFieldBegin(EMAIL_MONITOR_FIELD_DESC);
- struct.emailMonitor.write(oprot);
+ if (struct.emailMonitorProperty != null) {
+ if (struct.isSetEmailMonitorProperty()) {
+ oprot.writeFieldBegin(EMAIL_MONITOR_PROPERTY_FIELD_DESC);
+ struct.emailMonitorProperty.write(oprot);
oprot.writeFieldEnd();
}
}
@@ -988,7 +988,7 @@ import org.slf4j.LoggerFactory;
if (struct.isSetMonitorMode()) {
optionals.set(2);
}
- if (struct.isSetEmailMonitor()) {
+ if (struct.isSetEmailMonitorProperty()) {
optionals.set(3);
}
oprot.writeBitSet(optionals, 4);
@@ -1001,8 +1001,8 @@ import org.slf4j.LoggerFactory;
if (struct.isSetMonitorMode()) {
oprot.writeI32(struct.monitorMode.getValue());
}
- if (struct.isSetEmailMonitor()) {
- struct.emailMonitor.write(oprot);
+ if (struct.isSetEmailMonitorProperty()) {
+ struct.emailMonitorProperty.write(oprot);
}
}
@@ -1030,9 +1030,9 @@ import org.slf4j.LoggerFactory;
struct.setMonitorModeIsSet(true);
}
if (incoming.get(3)) {
- struct.emailMonitor = new EmailMonitorProperty();
- struct.emailMonitor.read(iprot);
- struct.setEmailMonitorIsSet(true);
+ struct.emailMonitorProperty = new EmailMonitorProperty();
+ struct.emailMonitorProperty.read(iprot);
+ struct.setEmailMonitorPropertyIsSet(true);
}
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/airavata-api/thrift-interface-descriptions/computeResourceModel.thrift
----------------------------------------------------------------------
diff --git a/airavata-api/thrift-interface-descriptions/computeResourceModel.thrift b/airavata-api/thrift-interface-descriptions/computeResourceModel.thrift
index cf9b06c..b13b2e5 100644
--- a/airavata-api/thrift-interface-descriptions/computeResourceModel.thrift
+++ b/airavata-api/thrift-interface-descriptions/computeResourceModel.thrift
@@ -348,7 +348,7 @@ struct SSHJobSubmission {
4: optional string alternativeSSHHostName,
5: optional i32 sshPort = 22,
6: optional MonitorMode monitorMode,
- 7: optional EmailMonitorProperty emailMonitor
+ 7: optional EmailMonitorProperty emailMonitorProperty
}
struct GlobusJobSubmission {
http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
index 2b23596..075f942 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -21,6 +21,7 @@
package org.apache.airavata.gfac.gsissh.provider.impl;
import org.airavata.appcatalog.cpi.AppCatalog;
+import org.airavata.appcatalog.cpi.AppCatalogException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.ExecutionMode;
@@ -36,11 +37,14 @@ import org.apache.airavata.gfac.core.utils.GFacUtils;
import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils;
import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
+import org.apache.airavata.gfac.monitor.email.EmailMonitorFactory;
import org.apache.airavata.gsi.ssh.api.Cluster;
import org.apache.airavata.gsi.ssh.api.SSHApiException;
import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.EmailMonitorProperty;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
@@ -138,48 +142,53 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
}
- public void delegateToMonitorHandlers(JobExecutionContext jobExecutionContext, SSHJobSubmission sshJobSubmission, String jobID) throws GFacHandlerException {
- if (ServerSettings.isEmailBasedNotificationEnable()) {
- try {
- EmailBasedMonitor emailBasedMonitor = EmailBasedMonitor.getInstant(BetterGfacImpl.getMonitorPublisher());
- emailBasedMonitor.addToJobMonitorMap(jobExecutionContext);
- } catch (ApplicationSettingsException e) {
- throw new GFacHandlerException("Error while delegating job execution context to email based monitor");
- }
- } else {
- List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
- if (daemonHandlers == null) {
- daemonHandlers = BetterGfacImpl.getDaemonHandlers();
- }
- ThreadedHandler pullMonitorHandler = null;
- ThreadedHandler pushMonitorHandler = null;
- MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
- for (ThreadedHandler threadedHandler : daemonHandlers) {
- if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
- pullMonitorHandler = threadedHandler;
- if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) {
- log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned: " + jobID);
- pullMonitorHandler.invoke(jobExecutionContext);
- } else {
- log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" +
- " to handle by the GridPullMonitorHandler");
- }
- } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
- pushMonitorHandler = threadedHandler;
- if (monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) {
- log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned: " + jobID);
- pushMonitorHandler.invoke(jobExecutionContext);
- } else {
- log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
- " to handle by the GridPushMonitorHandler");
- }
+ public void delegateToMonitorHandlers(JobExecutionContext jobExecutionContext, SSHJobSubmission sshJobSubmission, String jobID) throws GFacHandlerException, AppCatalogException {
+ if (jobExecutionContext.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
+ if (sshJobSubmission.getMonitorMode() == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) {
+ EmailMonitorProperty emailMonitorProp = sshJobSubmission.getEmailMonitorProperty();
+ if (emailMonitorProp != null) {
+ EmailMonitorFactory emailMonitorFactory = new EmailMonitorFactory();
+ EmailBasedMonitor emailBasedMonitor = emailMonitorFactory.getEmailBasedMonitor(emailMonitorProp);
+ emailBasedMonitor.addToJobMonitorMap(jobExecutionContext);
+ return;
}
- // have to handle the GridPushMonitorHandler logic
}
- if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
- log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
- ", execution is configured as asynchronous, so Outhandler will not be invoked");
+ }
+
+ // if email monitor is not activeated or not configure we use pull or push monitor
+ List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+ if (daemonHandlers == null) {
+ daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+ }
+ ThreadedHandler pullMonitorHandler = null;
+ ThreadedHandler pushMonitorHandler = null;
+ MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
+ for (ThreadedHandler threadedHandler : daemonHandlers) {
+ if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
+ pullMonitorHandler = threadedHandler;
+ if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) {
+ log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned: " + jobID);
+ pullMonitorHandler.invoke(jobExecutionContext);
+ } else {
+ log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" +
+ " to handle by the GridPullMonitorHandler");
+ }
+ } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
+ pushMonitorHandler = threadedHandler;
+ if (monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) {
+ log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned: " + jobID);
+ pushMonitorHandler.invoke(jobExecutionContext);
+ } else {
+ log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
+ " to handle by the GridPushMonitorHandler");
+ }
}
+ // have to handle the GridPushMonitorHandler logic
+ }
+ if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
+ log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
+ ", execution is configured as asynchronous, so Outhandler will not be invoked");
+
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index b6bfa6c..affe156 100644
--- a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -21,18 +21,19 @@
package org.apache.airavata.gfac.monitor.email;
import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.logger.AiravataLogger;
import org.apache.airavata.common.logger.AiravataLoggerFactory;
-import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
import org.apache.airavata.gfac.core.utils.GFacThreadPoolExecutor;
import org.apache.airavata.gfac.core.utils.OutHandlerWorker;
import org.apache.airavata.gfac.monitor.email.parser.EmailParser;
import org.apache.airavata.gfac.monitor.email.parser.LonestarEmailParser;
import org.apache.airavata.gfac.monitor.email.parser.PBSEmailParser;
import org.apache.airavata.gfac.monitor.email.parser.SLURMEmailParser;
+import org.apache.airavata.model.appcatalog.computeresource.EmailMonitorProperty;
+import org.apache.airavata.model.appcatalog.computeresource.EmailProtocol;
import org.apache.airavata.model.messaging.event.JobIdentifier;
import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
import org.apache.airavata.model.workspace.experiment.JobState;
@@ -59,39 +60,34 @@ public class EmailBasedMonitor implements Runnable{
private static final String PBS_CONSULT_SDSC_EDU = "pbsconsult@sdsc.edu";
private static final String SLURM_BATCH_STAMPEDE = "slurm@batch1.stampede.tacc.utexas.edu";
private static final String LONESTAR_ADDRESS = "root@c312-206.ls4.tacc.utexas.edu";
- private static EmailBasedMonitor emailBasedMonitor;
- private final MonitorPublisher monitorPublisher;
+ private final EmailMonitorProperty emailMonitorProperty;
+ private boolean stopMonitoring = false;
private Session session ;
private Store store;
private Folder emailFolder;
- private String host, emailAddress, password, folderName, mailStoreProtocol;
+// private String host, emailAddress, password, folderName, mailStoreProtocol;
private Properties properties;
private Map<String, JobExecutionContext> jobMonitorMap = new ConcurrentHashMap<String, JobExecutionContext>();
- private EmailBasedMonitor(MonitorPublisher monitorPublisher) throws ApplicationSettingsException {
- this.monitorPublisher = monitorPublisher;
+ public EmailBasedMonitor(EmailMonitorProperty emailMonitorProp) {
+ this.emailMonitorProperty = emailMonitorProp;
init();
}
- private void init() throws ApplicationSettingsException {
- host = ServerSettings.getEmailBasedMonitorHost();
- emailAddress = ServerSettings.getEmailBasedMonitorAddress();
- password = ServerSettings.getEmailBasedMonitorPassword();
- folderName = ServerSettings.getEmailBasedMonitorFolderName();
- mailStoreProtocol = ServerSettings.getEmailBasedMonitorStoreProtocol();
-
+ private void init() {
properties = new Properties();
- properties.put("mail.store.protocol", mailStoreProtocol);
+ properties.put("mail.store.protocol", emailMonitorProperty.getStoreProtocol());
}
- public static EmailBasedMonitor getInstant(MonitorPublisher monitorPublisher) throws ApplicationSettingsException {
+/* public static EmailBasedMonitor getInstant(EmailMonitorProperty emailMonitorProp, MonitorPublisher monitorPublisher)
+ throws ApplicationSettingsException {
if (emailBasedMonitor == null) {
synchronized (EmailBasedMonitor.class) {
if (emailBasedMonitor == null) {
- emailBasedMonitor = new EmailBasedMonitor(monitorPublisher);
+ emailBasedMonitor = new EmailBasedMonitor(emailMonitorProp);
Thread thread = new Thread(emailBasedMonitor);
thread.start();
}
@@ -99,7 +95,7 @@ public class EmailBasedMonitor implements Runnable{
}
return emailBasedMonitor;
- }
+ }*/
public void addToJobMonitorMap(JobExecutionContext jobExecutionContext) {
addToJobMonitorMap(jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext);
@@ -134,14 +130,15 @@ public class EmailBasedMonitor implements Runnable{
public void run() {
try {
session = Session.getDefaultInstance(properties);
- store = session.getStore(mailStoreProtocol);
- store.connect(host, emailAddress, password);
- while (!ServerSettings.isStopAllThreads()) {
+ store = session.getStore(getProtocol(emailMonitorProperty.getStoreProtocol()));
+ store.connect(emailMonitorProperty.getHost(), emailMonitorProperty.getEmailAddress(),
+ emailMonitorProperty.getPassword());
+ while (!(stopMonitoring || ServerSettings.isStopAllThreads())) {
if (!store.isConnected()) {
store.connect();
}
Thread.sleep(2000);
- emailFolder = store.getFolder(folderName);
+ emailFolder = store.getFolder(emailMonitorProperty.getFolderName());
emailFolder.open(Folder.READ_WRITE);
Message[] searchMessages = emailFolder.search(new FlagTerm(new Flags(Flags.Flag.SEEN), false));
List<Message> processedMessages = new ArrayList<>();
@@ -177,6 +174,8 @@ public class EmailBasedMonitor implements Runnable{
log.error("Couldn't connect to the store ", e);
} catch (InterruptedException e) {
log.error("Interrupt exception while sleep ", e);
+ } catch (AiravataException e) {
+ log.error("UnHandled arguments ", e);
} finally {
try {
store.close();
@@ -186,11 +185,21 @@ public class EmailBasedMonitor implements Runnable{
}
}
+ private String getProtocol(EmailProtocol storeProtocol) throws AiravataException {
+ switch (storeProtocol) {
+ case IMAPS:
+ return "imaps";
+ case POP3:
+ return "pop3";
+ default:
+ throw new AiravataException("Unhandled Email store protocol ");
+ }
+ }
private void process(JobStatusResult jobStatusResult, JobExecutionContext jEC){
JobState resultState = jobStatusResult.getState();
jEC.getJobDetails().setJobStatus(new JobStatus(resultState));
if (resultState == JobState.COMPLETE) {
- GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(jEC, monitorPublisher));
+ GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(jEC, BetterGfacImpl.getMonitorPublisher()));
}else if (resultState == JobState.QUEUED) {
// TODO - publish queued rabbitmq message
}else if (resultState == JobState.FAILED) {
@@ -216,7 +225,7 @@ public class EmailBasedMonitor implements Runnable{
"experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(),
jobStatus.getJobIdentity().getTaskId());
- monitorPublisher.publish(jobStatus);
+ BetterGfacImpl.getMonitorPublisher().publish(jobStatus);
}
private void writeEnvelopeOnError(Message m) throws MessagingException {
@@ -235,4 +244,8 @@ public class EmailBasedMonitor implements Runnable{
if (m.getSubject() != null)
log.error("SUBJECT: " + m.getSubject());
}
+
+ public void stopMonitoring() {
+ stopMonitoring = true;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailMonitorFactory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailMonitorFactory.java b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailMonitorFactory.java
new file mode 100644
index 0000000..2c24973
--- /dev/null
+++ b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailMonitorFactory.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.email;
+
+import org.apache.airavata.model.appcatalog.computeresource.EmailMonitorProperty;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class EmailMonitorFactory {
+
+ private Map<String, EmailBasedMonitor> emailMonitors = new HashMap<String, EmailBasedMonitor>();
+
+
+ public synchronized EmailBasedMonitor getEmailBasedMonitor(EmailMonitorProperty emailMonitorProp) {
+ String key = getKey(emailMonitorProp);
+ EmailBasedMonitor monitor = emailMonitors.get(key);
+ if (monitor == null) {
+ monitor = new EmailBasedMonitor(emailMonitorProp);
+ emailMonitors.put(key, monitor);
+ new Thread(monitor).start();
+ }
+ return monitor;
+ }
+
+ public void stopAllMonitors() {
+ for (EmailBasedMonitor emailBasedMonitor : emailMonitors.values()) {
+ emailBasedMonitor.stopMonitoring();
+ }
+ }
+
+ private String getKey(EmailMonitorProperty emailMonitorProp) {
+ StringBuffer sb = new StringBuffer(emailMonitorProp.getHost().trim());
+ sb.append("_").append(emailMonitorProp.getStoreProtocol().name());
+ sb.append("_").append(emailMonitorProp.getEmailAddress().trim());
+ sb.append("_").append(emailMonitorProp.getFolderName().trim());
+ return sb.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index b201c79..7c29352 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -21,6 +21,7 @@
package org.apache.airavata.gfac.ssh.provider.impl;
+import org.airavata.appcatalog.cpi.AppCatalogException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.Constants;
@@ -36,6 +37,7 @@ import org.apache.airavata.gfac.core.provider.AbstractProvider;
import org.apache.airavata.gfac.core.provider.GFacProviderException;
import org.apache.airavata.gfac.core.utils.GFacUtils;
import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
+import org.apache.airavata.gfac.monitor.email.EmailMonitorFactory;
import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
import org.apache.airavata.gsi.ssh.api.Cluster;
@@ -47,9 +49,12 @@ import org.apache.airavata.gsi.ssh.impl.StandardOutReader;
import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths;
import org.apache.airavata.model.appcatalog.appinterface.DataType;
import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.appcatalog.computeresource.EmailMonitorProperty;
import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
import org.apache.airavata.model.workspace.experiment.ErrorCategory;
import org.apache.airavata.model.workspace.experiment.JobDetails;
@@ -369,33 +374,40 @@ public class SSHProvider extends AbstractProvider {
return stdOutputString;
}
- public void delegateToMonitorHandlers(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- if (ServerSettings.isEmailBasedNotificationEnable()) {
- try {
- EmailBasedMonitor emailBasedMonitor = EmailBasedMonitor.getInstant(BetterGfacImpl.getMonitorPublisher());
- emailBasedMonitor.addToJobMonitorMap(jobExecutionContext);
- } catch (ApplicationSettingsException e) {
- throw new GFacHandlerException("Error while delegating job execution context to email based monitor");
- }
- } else {
- List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
- if (daemonHandlers == null) {
- daemonHandlers = BetterGfacImpl.getDaemonHandlers();
- }
- ThreadedHandler pullMonitorHandler = null;
- ThreadedHandler pushMonitorHandler = null;
- for (ThreadedHandler threadedHandler : daemonHandlers) {
- if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
- pullMonitorHandler = threadedHandler;
- pullMonitorHandler.invoke(jobExecutionContext);
+ public void delegateToMonitorHandlers(JobExecutionContext jobExecutionContext) throws GFacHandlerException, AppCatalogException {
+ if (jobExecutionContext.getPreferredJobSubmissionProtocol()== JobSubmissionProtocol.SSH) {
+ String jobSubmissionInterfaceId = jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId();
+ SSHJobSubmission sshJobSubmission = jobExecutionContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId);
+ if (sshJobSubmission.getMonitorMode() == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) {
+ EmailMonitorProperty emailMonitorProp = sshJobSubmission.getEmailMonitorProperty();
+ if (emailMonitorProp != null) {
+ EmailMonitorFactory emailMonitorFactory = new EmailMonitorFactory();
+ EmailBasedMonitor emailBasedMonitor = emailMonitorFactory.getEmailBasedMonitor(emailMonitorProp);
+ emailBasedMonitor.addToJobMonitorMap(jobExecutionContext);
+ return;
}
- // have to handle the GridPushMonitorHandler logic
}
- if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
- log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
- ", execution is configured as asynchronous, so Outhandler will not be invoked");
+ }
+
+ // if email monitor is not activeated or not configure we use pull or push monitor
+ List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+ if (daemonHandlers == null) {
+ daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+ }
+ ThreadedHandler pullMonitorHandler = null;
+ ThreadedHandler pushMonitorHandler = null;
+ for (ThreadedHandler threadedHandler : daemonHandlers) {
+ if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
+ pullMonitorHandler = threadedHandler;
+ pullMonitorHandler.invoke(jobExecutionContext);
}
+ // have to handle the GridPushMonitorHandler logic
}
+ if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
+ log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
+ ", execution is configured as asynchronous, so Outhandler will not be invoked");
+ }
+
}
}