You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/09/11 21:12:39 UTC
git commit: adding push monitoring to find the job complete and
fixing issue with output stream reading
Repository: airavata
Updated Branches:
refs/heads/master 92ad9f122 -> 68e81ef80
adding push monitoring to find the job complete and fixing issue with output stream reading
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/68e81ef8
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/68e81ef8
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/68e81ef8
Branch: refs/heads/master
Commit: 68e81ef80a81b1e88efeb39f0e13a0191ba2e4ce
Parents: 92ad9f1
Author: lahiru <la...@apache.org>
Authored: Thu Sep 11 15:12:25 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Thu Sep 11 15:12:25 2014 -0400
----------------------------------------------------------------------
.../airavata/common/utils/ServerSettings.java | 202 ++++++++++---------
.../server/src/main/resources/PBSTemplate.xslt | 2 +-
.../src/main/resources/SLURMTemplate.xslt | 1 +
.../main/resources/airavata-server.properties | 8 +-
.../airavata/gfac/core/monitor/MonitorID.java | 4 +-
.../gfac/gsissh/util/GFACGSISSHUtils.java | 3 +
.../monitor/impl/pull/qstat/HPCPullMonitor.java | 40 +++-
.../impl/push/amqp/SimpleJobFinishConsumer.java | 76 +++++++
.../airavata/gfac/monitor/util/CommonUtils.java | 6 +-
.../airavata/gfac/ssh/util/GFACSSHUtils.java | 3 +
.../airavata/gsi/ssh/api/job/JobDescriptor.java | 7 +
.../gsi/ssh/api/job/SlurmOutputParser.java | 4 +-
.../gsi/ssh/impl/StandardOutReader.java | 4 +-
.../gsi/ssh/impl/SystemCommandOutput.java | 1 +
.../main/resources/schemas/PBSJobDescriptor.xsd | 1 +
15 files changed, 252 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index 93f3bc3..a7fc02d 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -21,6 +21,8 @@
package org.apache.airavata.common.utils;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import javax.management.InstanceNotFoundException;
@@ -37,29 +39,30 @@ import org.apache.coyote.http11.Http11AprProtocol;
import org.apache.coyote.http11.Http11NioProtocol;
import org.apache.coyote.http11.Http11Protocol;
-public class ServerSettings extends ApplicationSettings{
- private static final String SYSTEM_USER="system.user";
- private static final String SYSTEM_USER_PASSWORD="system.password";
- private static final String SYSTEM_USER_GATEWAY="system.gateway";
-
- private static final String DEFAULT_USER="default.registry.user";
- private static final String DEFAULT_USER_PASSWORD="default.registry.password";
- private static final String DEFAULT_USER_GATEWAY="default.registry.gateway";
-
- private static final String TOMCAT_PORT = "port";
- private static final String SERVER_CONTEXT_ROOT="server.context-root";
- public static final String EMBEDDED_ZK = "embedded.zk";
- private static String tomcatPort=null;
+public class ServerSettings extends ApplicationSettings {
+ private static final String SYSTEM_USER = "system.user";
+ private static final String SYSTEM_USER_PASSWORD = "system.password";
+ private static final String SYSTEM_USER_GATEWAY = "system.gateway";
- private static final String CREDENTIAL_STORE_DB_URL ="credential.store.jdbc.url";
- private static final String CREDENTIAL_STORE_DB_USER ="credential.store.jdbc.user";
- private static final String CREDENTIAL_STORE_DB_PASSWORD ="credential.store.jdbc.password";
- private static final String CREDENTIAL_STORE_DB_DRIVER ="credential.store.jdbc.driver";
+ private static final String DEFAULT_USER = "default.registry.user";
+ private static final String DEFAULT_USER_PASSWORD = "default.registry.password";
+ private static final String DEFAULT_USER_GATEWAY = "default.registry.gateway";
- private static final String REGISTRY_DB_URL ="registry.jdbc.url";
- private static final String REGISTRY_DB_USER ="registry.jdbc.user";
- private static final String REGISTRY_DB_PASSWORD ="registry.jdbc.password";
- private static final String REGISTRY_DB_DRIVER ="registry.jdbc.driver";
+ private static final String TOMCAT_PORT = "port";
+ private static final String SERVER_CONTEXT_ROOT = "server.context-root";
+ public static final String EMBEDDED_ZK = "embedded.zk";
+ public static final String IP = "ip";
+ private static String tomcatPort = null;
+
+ private static final String CREDENTIAL_STORE_DB_URL = "credential.store.jdbc.url";
+ private static final String CREDENTIAL_STORE_DB_USER = "credential.store.jdbc.user";
+ private static final String CREDENTIAL_STORE_DB_PASSWORD = "credential.store.jdbc.password";
+ private static final String CREDENTIAL_STORE_DB_DRIVER = "credential.store.jdbc.driver";
+
+ private static final String REGISTRY_DB_URL = "registry.jdbc.url";
+ private static final String REGISTRY_DB_USER = "registry.jdbc.user";
+ private static final String REGISTRY_DB_PASSWORD = "registry.jdbc.password";
+ private static final String REGISTRY_DB_DRIVER = "registry.jdbc.driver";
private static final String ENABLE_HTTPS = "enable.https";
private static final String HOST_SCHEDULER = "host.scheduler";
private static final String MY_PROXY_SERVER = "myproxy.server";
@@ -70,32 +73,32 @@ public class ServerSettings extends ApplicationSettings{
private static boolean stopAllThreads = false;
- public static String getDefaultUser() throws ApplicationSettingsException{
- return getSetting(DEFAULT_USER);
+ public static String getDefaultUser() throws ApplicationSettingsException {
+ return getSetting(DEFAULT_USER);
}
-
- public static String getDefaultUserPassword() throws ApplicationSettingsException{
- return getSetting(DEFAULT_USER_PASSWORD);
+
+ public static String getDefaultUserPassword() throws ApplicationSettingsException {
+ return getSetting(DEFAULT_USER_PASSWORD);
}
-
- public static String getDefaultUserGateway() throws ApplicationSettingsException{
- return getSetting(DEFAULT_USER_GATEWAY);
+
+ public static String getDefaultUserGateway() throws ApplicationSettingsException {
+ return getSetting(DEFAULT_USER_GATEWAY);
}
-
- public static String getSystemUser() throws ApplicationSettingsException{
- return getSetting(SYSTEM_USER);
+
+ public static String getSystemUser() throws ApplicationSettingsException {
+ return getSetting(SYSTEM_USER);
}
-
- public static String getSystemUserPassword() throws ApplicationSettingsException{
- return getSetting(SYSTEM_USER_PASSWORD);
+
+ public static String getSystemUserPassword() throws ApplicationSettingsException {
+ return getSetting(SYSTEM_USER_PASSWORD);
}
-
- public static String getSystemUserGateway() throws ApplicationSettingsException{
- return getSetting(SYSTEM_USER_GATEWAY);
+
+ public static String getSystemUserGateway() throws ApplicationSettingsException {
+ return getSetting(SYSTEM_USER_GATEWAY);
}
- public static String getServerContextRoot(){
- return getSetting(SERVER_CONTEXT_ROOT,"axis2");
+ public static String getServerContextRoot() {
+ return getSetting(SERVER_CONTEXT_ROOT, "axis2");
}
public static String getCredentialStoreDBUser() throws ApplicationSettingsException {
@@ -140,57 +143,57 @@ public class ServerSettings extends ApplicationSettings{
}
public static String getTomcatPort(String protocol) throws ApplicationSettingsException {
- if (tomcatPort==null) {
- try {
- //First try to get the port from a tomcat if it is already running
- ArrayList<MBeanServer> mBeanServers = MBeanServerFactory
- .findMBeanServer(null);
- if (mBeanServers.size() > 0) {
- MBeanServer mBeanServer = mBeanServers.get(0);
- Server server = null;
- String[] domains = mBeanServer.getDomains();
- for (String domain : domains) {
- try {
- server = (Server) mBeanServer.getAttribute(
- new ObjectName(domain, "type", "Server"),
- "managedResource");
- break;
- } catch (InstanceNotFoundException e) {
- //ignore
- }
- }
- if (server != null) {
- Service[] findServices = server.findServices();
- for (Service service : findServices) {
- for (Connector connector : service.findConnectors()) {
- ProtocolHandler protocolHandler = connector.getProtocolHandler();
- if(protocol != null && protocol.equals(connector.getScheme())){
- if (protocolHandler instanceof Http11Protocol
- || protocolHandler instanceof Http11AprProtocol
- || protocolHandler instanceof Http11NioProtocol) {
- Http11Protocol p = (Http11Protocol) protocolHandler;
- if (p.getSslImplementationName() == null
- || p.getSslImplementationName()
- .equals("")) {
- tomcatPort = String.valueOf(connector
- .getPort());
- }
+ if (tomcatPort == null) {
+ try {
+ //First try to get the port from a tomcat if it is already running
+ ArrayList<MBeanServer> mBeanServers = MBeanServerFactory
+ .findMBeanServer(null);
+ if (mBeanServers.size() > 0) {
+ MBeanServer mBeanServer = mBeanServers.get(0);
+ Server server = null;
+ String[] domains = mBeanServer.getDomains();
+ for (String domain : domains) {
+ try {
+ server = (Server) mBeanServer.getAttribute(
+ new ObjectName(domain, "type", "Server"),
+ "managedResource");
+ break;
+ } catch (InstanceNotFoundException e) {
+ //ignore
+ }
+ }
+ if (server != null) {
+ Service[] findServices = server.findServices();
+ for (Service service : findServices) {
+ for (Connector connector : service.findConnectors()) {
+ ProtocolHandler protocolHandler = connector.getProtocolHandler();
+ if (protocol != null && protocol.equals(connector.getScheme())) {
+ if (protocolHandler instanceof Http11Protocol
+ || protocolHandler instanceof Http11AprProtocol
+ || protocolHandler instanceof Http11NioProtocol) {
+ Http11Protocol p = (Http11Protocol) protocolHandler;
+ if (p.getSslImplementationName() == null
+ || p.getSslImplementationName()
+ .equals("")) {
+ tomcatPort = String.valueOf(connector
+ .getPort());
+ }
+ }
}
- }
- }
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- //if unable to determine the server port from a running tomcat server, get it from
- //the server settings file
- if (tomcatPort == null) {
- tomcatPort = getSetting(TOMCAT_PORT);
- }
- }
- return tomcatPort;
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ //if unable to determine the server port from a running tomcat server, get it from
+ //the server settings file
+ if (tomcatPort == null) {
+ tomcatPort = getSetting(TOMCAT_PORT);
+ }
+ }
+ return tomcatPort;
}
public static String getHostScheduler() throws ApplicationSettingsException {
@@ -204,22 +207,41 @@ public class ServerSettings extends ApplicationSettings{
public static void setStopAllThreads(boolean stopAllThreads) {
ServerSettings.stopAllThreads = stopAllThreads;
}
+
public static String getMyProxyServer() throws ApplicationSettingsException {
return getSetting(MY_PROXY_SERVER);
}
+
public static String getMyProxyUser() throws ApplicationSettingsException {
return getSetting(MY_PROXY_USER);
}
+
public static String getMyProxyPassword() throws ApplicationSettingsException {
return getSetting(MY_PROXY_PASSWORD);
}
+
public static int getMyProxyLifetime() throws ApplicationSettingsException {
return Integer.parseInt(getSetting(MY_PROXY_LIFETIME));
}
+
public static String[] getActivityListeners() throws ApplicationSettingsException {
return getSetting(ACTIVITY_LISTENERS).split(",");
}
+
public static boolean isEmbeddedZK() {
return Boolean.parseBoolean(getSetting(EMBEDDED_ZK, "true"));
}
+
+ public static String getIp() {
+ try {
+ return getSetting(IP);
+ } catch (ApplicationSettingsException e) {
+ try {
+ return InetAddress.getLocalHost().getHostAddress();
+ } catch (UnknownHostException e1) {
+ e1.printStackTrace();
+ }
+ }
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/modules/configuration/server/src/main/resources/PBSTemplate.xslt
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/PBSTemplate.xslt b/modules/configuration/server/src/main/resources/PBSTemplate.xslt
index a221ce2..3306215 100644
--- a/modules/configuration/server/src/main/resources/PBSTemplate.xslt
+++ b/modules/configuration/server/src/main/resources/PBSTemplate.xslt
@@ -74,7 +74,7 @@ cd <xsl:text> </xsl:text><xsl:value-of select="ns:workingDirectory"/><xsl:text
<xsl:for-each select="ns:postJobCommands/ns:command">
<xsl:value-of select="."/><xsl:text> </xsl:text>
</xsl:for-each>
-
+~/rabbitmq-java-client-bin-3.3.5/runjava.sh com.rabbitmq.examples.SimpleProducer amqp://<xsl:value-of select="ns:callBackIp"/> <xsl:value-of select="ns:userName"/>,<xsl:value-of select="ns:jobName"/>
</xsl:template>
</xsl:stylesheet>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/modules/configuration/server/src/main/resources/SLURMTemplate.xslt
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/SLURMTemplate.xslt b/modules/configuration/server/src/main/resources/SLURMTemplate.xslt
index 4597476..c09b35d 100644
--- a/modules/configuration/server/src/main/resources/SLURMTemplate.xslt
+++ b/modules/configuration/server/src/main/resources/SLURMTemplate.xslt
@@ -69,6 +69,7 @@ cd <xsl:text> </xsl:text><xsl:value-of select="ns:workingDirectory"/><xsl:text
<xsl:for-each select="ns:inputs/ns:input">
<xsl:value-of select="."/><xsl:text> </xsl:text>
</xsl:for-each>
+~/rabbitmq-java-client-bin-3.3.5/runjava.sh com.rabbitmq.examples.SimpleProducer amqp://<xsl:value-of select="ns:callBackIp"/> <xsl:value-of select="ns:userName"/>,<xsl:value-of select="ns:jobName"/>
</xsl:template>
</xsl:stylesheet>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index 10c0a51..b1152c4 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -34,9 +34,8 @@
port=8080
-# Axis2 server automatically picks up IP address from axis configuration,
-# but some DHCP enables machines do not report correct ip addresses,
-# in which case, the IP address can be manually specified.
+#This property will be useful when there are multiple network interfaces in the machine where airavata is
+#deployed, so users have to specify the ip address manually and this can be use for callback ip of the system(specially in gfac).
#ip=192.2.33.12
@@ -187,7 +186,10 @@ amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
connection.name=xsede
activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher
+#This property will be useful when there are multiple network interfaces in the machine where airavata is
+#deployed, so users have to specify the ip address manually and this can be use for callback ip of the system(specially in gfac).
+#ip=192.2.33.12
###---------------------------Orchestrator module Configurations---------------------------###
#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
job.submitter=org.apache.airavata.orchestrator.core.impl.GFACServiceJobSubmitter
http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
index 563db94..2270db4 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
@@ -193,12 +193,12 @@ public class MonitorID {
if (getFailedCount() >= 2) {
switch (this.state) {
case ACTIVE:
- this.state = JobState.COMPLETE;
+// this.state = JobState.COMPLETE;
logger.info("Failed count is high and old status is ACTIVE so we mark this as COMPLETE");
break;
case QUEUED:
logger.info("Failed count is high and old status is QUEUED so we mark this as COMPLETE");
- this.state = JobState.COMPLETE;
+// this.state = JobState.COMPLETE;
break;
default:
int loginfo = getFailedCount()+1;
http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
index 5de902f..9e45986 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
@@ -42,6 +42,7 @@ import org.apache.airavata.gsi.ssh.api.Cluster;
import org.apache.airavata.gsi.ssh.api.ServerInfo;
import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
import org.apache.airavata.gsi.ssh.api.job.JobManagerConfiguration;
+import org.apache.airavata.gsi.ssh.impl.GSISSHAbstractCluster;
import org.apache.airavata.gsi.ssh.impl.PBSCluster;
import org.apache.airavata.gsi.ssh.util.CommonUtils;
import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
@@ -112,6 +113,7 @@ public class GFACGSISSHUtils {
ApplicationDeploymentDescriptionType app, Cluster cluster) {
JobDescriptor jobDescriptor = new JobDescriptor();
// this is common for any application descriptor
+ jobDescriptor.setCallBackIp(ServerSettings.getIp());
jobDescriptor.setInputDirectory(app.getInputDataDirectory());
jobDescriptor.setOutputDirectory(app.getOutputDataDirectory());
jobDescriptor.setExecutablePath(app.getExecutableLocation());
@@ -151,6 +153,7 @@ public class GFACGSISSHUtils {
if (app instanceof HpcApplicationDeploymentType) {
HpcApplicationDeploymentType applicationDeploymentType
= (HpcApplicationDeploymentType) app;
+ jobDescriptor.setUserName(((GSISSHAbstractCluster)cluster).getServerInfo().getUserName());
jobDescriptor.setShellName("/bin/bash");
jobDescriptor.setAllEnvExport(true);
jobDescriptor.setMailOptions("n");
http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 93e1aa9..35cad83 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -44,6 +44,7 @@ import org.apache.airavata.gfac.monitor.HostMonitorData;
import org.apache.airavata.gfac.monitor.UserMonitorData;
import org.apache.airavata.gfac.monitor.core.PullMonitor;
import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.gfac.monitor.impl.push.amqp.SimpleJobFinishConsumer;
import org.apache.airavata.gfac.monitor.util.CommonUtils;
import org.apache.airavata.gsi.ssh.api.SSHApiException;
import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
@@ -63,6 +64,7 @@ import com.google.common.eventbus.EventBus;
*/
public class HPCPullMonitor extends PullMonitor {
private final static Logger logger = LoggerFactory.getLogger(HPCPullMonitor.class);
+ public static final int FAILED_COUNT = 5;
// I think this should use DelayedBlocking Queue to do the monitoring*/
private BlockingQueue<UserMonitorData> queue;
@@ -75,6 +77,7 @@ public class HPCPullMonitor extends PullMonitor {
private LinkedBlockingQueue<String> cancelJobList;
+ private List<String> completedJobsFromPush;
private GFac gfac;
@@ -82,17 +85,21 @@ public class HPCPullMonitor extends PullMonitor {
public HPCPullMonitor() {
connections = new HashMap<String, ResourceConnection>();
- this.queue = new LinkedBlockingDeque<UserMonitorData>();
+ queue = new LinkedBlockingDeque<UserMonitorData>();
publisher = new MonitorPublisher(new EventBus());
cancelJobList = new LinkedBlockingQueue<String>();
+ completedJobsFromPush = new ArrayList<String>();
+ (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen();
}
public HPCPullMonitor(MonitorPublisher monitorPublisher, AuthenticationInfo authInfo) {
connections = new HashMap<String, ResourceConnection>();
- this.queue = new LinkedBlockingDeque<UserMonitorData>();
+ queue = new LinkedBlockingDeque<UserMonitorData>();
publisher = monitorPublisher;
authenticationInfo = authInfo;
cancelJobList = new LinkedBlockingQueue<String>();
+ this.completedJobsFromPush = new ArrayList<String>();
+ (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen();
}
public HPCPullMonitor(BlockingQueue<UserMonitorData> queue, MonitorPublisher publisher) {
@@ -100,6 +107,8 @@ public class HPCPullMonitor extends PullMonitor {
this.publisher = publisher;
connections = new HashMap<String, ResourceConnection>();
cancelJobList = new LinkedBlockingQueue<String>();
+ this.completedJobsFromPush = new ArrayList<String>();
+ (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen();
}
@@ -171,24 +180,41 @@ public class HPCPullMonitor extends PullMonitor {
connection = new ResourceConnection(iHostMonitorData,getAuthenticationInfo());
connections.put(hostName, connection);
}
- // before we get the statuses, we check the cancel job list and remove them permanently
+ // before we get the statuses, we check the cancel job list and remove them permanently
List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs();
+ Iterator<String> iterator1 = cancelJobList.iterator();
+
for(MonitorID iMonitorID:monitorID){
- for(String cancelMId:cancelJobList) {
+ while(iterator1.hasNext()) {
+ String cancelMId = iterator1.next();
if (cancelMId.equals(iMonitorID.getExperimentID() + "+" + iMonitorID.getTaskID())) {
logger.info("Found a match in monitoring Queue, so marking this job to remove from monitor queue " + cancelMId);
logger.info("ExperimentID: " + cancelMId.split("\\+")[0] + ",TaskID: " + cancelMId.split("\\+")[1] + "JobID" + iMonitorID.getJobID());
completedJobs.add(iMonitorID);
iMonitorID.setStatus(JobState.CANCELED);
+ iterator1.remove();
+ }
+ }
+ }
+ Iterator<String> iterator = completedJobsFromPush.iterator();
+ for(MonitorID iMonitorID:monitorID){
+ while(iterator.hasNext()) {
+ String cancelMId = iterator.next();
+ if (cancelMId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName())) {
+ logger.info("This job is finished because push notification came with <username,jobName> " + cancelMId);
+ completedJobs.add(iMonitorID);
+ iterator.remove();
+ iMonitorID.setStatus(JobState.COMPLETE);
}
}
}
Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID);
for (MonitorID iMonitorID : monitorID) {
currentMonitorID = iMonitorID;
- if (!JobState.CANCELED.equals(iMonitorID.getStatus())) {
- iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName())); //IMPORTANT this is not a simple setter we have a logic
+ if (!JobState.CANCELED.equals(iMonitorID.getStatus())&&
+ !JobState.COMPLETE.equals(iMonitorID.getStatus())) {
+ iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName())); //IMPORTANT this is NOT a simple setter we have a logic
}
jobStatus = new JobStatusChangeRequest(iMonitorID);
// we have this JobStatus class to handle amqp monitoring
@@ -211,7 +237,7 @@ public class HPCPullMonitor extends PullMonitor {
// ExperimentState.FAILED));
logger.info(e.getLocalizedMessage(), e);
}
- } else if (iMonitorID.getFailedCount() > 2) {
+ } else if (iMonitorID.getFailedCount() > FAILED_COUNT) {
logger.error("Tried to monitor the job with ID " + iMonitorID.getJobID() + " But failed 3 times, so skip this Job from Monitor");
iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
completedJobs.add(iMonitorID);
http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java
new file mode 100644
index 0000000..3d62fc0
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl.push.amqp;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.QueueingConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class SimpleJobFinishConsumer {
+ private final static Logger logger = LoggerFactory.getLogger(SimpleJobFinishConsumer.class);
+
+ private List<String> completedJobsFromPush;
+
+ public SimpleJobFinishConsumer(List<String> completedJobsFromPush) {
+ this.completedJobsFromPush = completedJobsFromPush;
+ }
+
+ public void listen() {
+ try {
+ String uri = "amqp://localhost";
+ String queueName = "SimpleQueue";
+
+ ConnectionFactory connFactory = new ConnectionFactory();
+ connFactory.setUri(uri);
+ Connection conn = connFactory.newConnection();
+
+ final Channel ch = conn.createChannel();
+
+ ch.queueDeclare(queueName, false, false, false, null);
+
+ final QueueingConsumer consumer = new QueueingConsumer(ch);
+ ch.basicConsume(queueName, consumer);
+ (new Thread() {
+ public void run() {
+ try {
+ while (true) {
+ QueueingConsumer.Delivery delivery = consumer.nextDelivery();
+ System.out.println(new String(delivery.getBody()));
+ completedJobsFromPush.add(new String(delivery.getBody()));
+ ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
+ }
+ } catch (Exception ex) {
+ logger.error("Cannot connect to a RabbitMQ Server: " + ex);
+ }
+ }
+
+ }).start();
+ } catch (Exception ex) {
+ logger.error("Cannot connect to a RabbitMQ Server: " + ex);
+ logger.info("------------- Push monitoring for HPC jobs is disabled -------------");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
index f8e4097..6db4550 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
@@ -132,7 +132,7 @@ public class CommonUtils {
// then this is the right place to update
List<HostMonitorData> hostMonitorData = next.getHostMonitorData();
for(HostMonitorData iHostMonitorID:hostMonitorData){
- if(iHostMonitorID.getHost().equals(monitorID.getHost())) {
+ if(iHostMonitorID.getHost().toXML().equals(monitorID.getHost().toXML())) {
List<MonitorID> monitorIDs = iHostMonitorID.getMonitorIDs();
for(MonitorID iMonitorID:monitorIDs){
if(iMonitorID.getJobID().equals(monitorID.getJobID())
@@ -155,9 +155,9 @@ public class CommonUtils {
}
}
}
- throw new AiravataMonitorException("Cannot find the given MonitorID in the queue with userName " +
+ logger.error("Cannot find the given MonitorID in the queue with userName " +
monitorID.getUserName() + " and jobID " + monitorID.getJobID());
-
+ logger.info("This might not be an error because someone else removed this job from the queue");
}
public static boolean isEqual(HostDescription host1,HostDescription host2) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
index ab1896e..75bd724 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
@@ -40,6 +40,7 @@ import org.apache.airavata.gsi.ssh.api.SSHApiException;
import org.apache.airavata.gsi.ssh.api.ServerInfo;
import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.impl.GSISSHAbstractCluster;
import org.apache.airavata.gsi.ssh.impl.PBSCluster;
import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
@@ -94,6 +95,7 @@ public class GFACSSHUtils {
ApplicationDeploymentDescriptionType app, Cluster cluster) {
JobDescriptor jobDescriptor = new JobDescriptor();
// this is common for any application descriptor
+ jobDescriptor.setCallBackIp(ServerSettings.getIp());
jobDescriptor.setInputDirectory(app.getInputDataDirectory());
jobDescriptor.setOutputDirectory(app.getOutputDataDirectory());
jobDescriptor.setExecutablePath(app.getExecutableLocation());
@@ -134,6 +136,7 @@ public class GFACSSHUtils {
if (app instanceof HpcApplicationDeploymentType) {
HpcApplicationDeploymentType applicationDeploymentType
= (HpcApplicationDeploymentType) app;
+ jobDescriptor.setUserName(((GSISSHAbstractCluster) cluster).getServerInfo().getUserName());
jobDescriptor.setShellName("/bin/bash");
jobDescriptor.setAllEnvExport(true);
jobDescriptor.setMailOptions("n");
http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobDescriptor.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobDescriptor.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobDescriptor.java
index 529b504..3890a09 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobDescriptor.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobDescriptor.java
@@ -395,5 +395,12 @@ public class JobDescriptor {
return this.getJobDescriptorDocument().getJobDescriptor().getUserName();
}
+ public void setCallBackIp(String ip){
+ this.jobDescriptionDocument.getJobDescriptor().setCallBackIp(ip);
+ }
+
+ public String getCallBackIp(){
+ return this.jobDescriptionDocument.getJobDescriptor().getCallBackIp();
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java
index 2fbbd0e..e99863b 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java
@@ -149,11 +149,11 @@ public class SlurmOutputParser implements OutputParser {
}
public void parse(String userName, Map<String, JobStatus> statusMap, String rawOutput) throws SSHApiException {
- log.info(rawOutput);
+ log.debug(rawOutput);
String[] info = rawOutput.split("\n");
String lastString = info[info.length -1];
if (lastString.contains("JOBID") || lastString.contains("PARTITION")) {
- // There are no jobs for this username
+ log.info("There are no jobs with this username ... ");
return;
}
int lastStop = 0;
http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java
index bc0231e..0ec9992 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java
@@ -51,14 +51,14 @@ public class StandardOutReader implements CommandOutput {
pbsOutput.append(new String(tmp, 0, i));
}
if (channel.isClosed()) {
- String output = pbsOutput.toString();
- this.setStdOutputString(output);
break;
}
try {
} catch (Exception ignored) {
}
}
+ String output = pbsOutput.toString();
+ this.setStdOutputString(output);
} catch (IOException e) {
e.printStackTrace();
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/SystemCommandOutput.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/SystemCommandOutput.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/SystemCommandOutput.java
index a6021f7..6e3b8e6 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/SystemCommandOutput.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/SystemCommandOutput.java
@@ -39,6 +39,7 @@ public class SystemCommandOutput implements CommandOutput {
public void onOutput(Channel channel) {
try {
InputStream inputStream = channel.getInputStream();
+
byte[] tmp = new byte[1024];
while (true) {
while (inputStream.available() > 0) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/tools/gsissh/src/main/resources/schemas/PBSJobDescriptor.xsd
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/resources/schemas/PBSJobDescriptor.xsd b/tools/gsissh/src/main/resources/schemas/PBSJobDescriptor.xsd
index be1a374..a9917bc 100644
--- a/tools/gsissh/src/main/resources/schemas/PBSJobDescriptor.xsd
+++ b/tools/gsissh/src/main/resources/schemas/PBSJobDescriptor.xsd
@@ -56,6 +56,7 @@
<element name="preJobCommands" type="gsissh:preJobCommands" minOccurs="0" maxOccurs="1"/>
<element name="postJobCommands" type="gsissh:postJobCommands" minOccurs="0" maxOccurs="1"/>
<element name="jobSubmitterCommand" type="xsd:string" minOccurs="0" maxOccurs="1"/>
+ <element name="callBackIp" type="xsd:string" minOccurs="0" maxOccurs="1"/>
</sequence>
</complexType>