You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/09/11 21:12:39 UTC

git commit: adding push monitoring to find the job complete and fixing issue with output stream reading

Repository: airavata
Updated Branches:
  refs/heads/master 92ad9f122 -> 68e81ef80


adding push monitoring to find the job complete and fixing issue with output stream reading


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/68e81ef8
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/68e81ef8
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/68e81ef8

Branch: refs/heads/master
Commit: 68e81ef80a81b1e88efeb39f0e13a0191ba2e4ce
Parents: 92ad9f1
Author: lahiru <la...@apache.org>
Authored: Thu Sep 11 15:12:25 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Thu Sep 11 15:12:25 2014 -0400

----------------------------------------------------------------------
 .../airavata/common/utils/ServerSettings.java   | 202 ++++++++++---------
 .../server/src/main/resources/PBSTemplate.xslt  |   2 +-
 .../src/main/resources/SLURMTemplate.xslt       |   1 +
 .../main/resources/airavata-server.properties   |   8 +-
 .../airavata/gfac/core/monitor/MonitorID.java   |   4 +-
 .../gfac/gsissh/util/GFACGSISSHUtils.java       |   3 +
 .../monitor/impl/pull/qstat/HPCPullMonitor.java |  40 +++-
 .../impl/push/amqp/SimpleJobFinishConsumer.java |  76 +++++++
 .../airavata/gfac/monitor/util/CommonUtils.java |   6 +-
 .../airavata/gfac/ssh/util/GFACSSHUtils.java    |   3 +
 .../airavata/gsi/ssh/api/job/JobDescriptor.java |   7 +
 .../gsi/ssh/api/job/SlurmOutputParser.java      |   4 +-
 .../gsi/ssh/impl/StandardOutReader.java         |   4 +-
 .../gsi/ssh/impl/SystemCommandOutput.java       |   1 +
 .../main/resources/schemas/PBSJobDescriptor.xsd |   1 +
 15 files changed, 252 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index 93f3bc3..a7fc02d 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -21,6 +21,8 @@
 
 package org.apache.airavata.common.utils;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 
 import javax.management.InstanceNotFoundException;
@@ -37,29 +39,30 @@ import org.apache.coyote.http11.Http11AprProtocol;
 import org.apache.coyote.http11.Http11NioProtocol;
 import org.apache.coyote.http11.Http11Protocol;
 
-public class ServerSettings extends ApplicationSettings{
-    private static final String SYSTEM_USER="system.user";
-    private static final String SYSTEM_USER_PASSWORD="system.password";
-    private static final String SYSTEM_USER_GATEWAY="system.gateway";
-    
-    private static final String DEFAULT_USER="default.registry.user";
-    private static final String DEFAULT_USER_PASSWORD="default.registry.password";
-    private static final String DEFAULT_USER_GATEWAY="default.registry.gateway";
-    
-    private static final String TOMCAT_PORT = "port";
-    private static final String SERVER_CONTEXT_ROOT="server.context-root";
-    public static final String EMBEDDED_ZK = "embedded.zk";
-    private static String tomcatPort=null;
+public class ServerSettings extends ApplicationSettings {
+    private static final String SYSTEM_USER = "system.user";
+    private static final String SYSTEM_USER_PASSWORD = "system.password";
+    private static final String SYSTEM_USER_GATEWAY = "system.gateway";
 
-    private static final String CREDENTIAL_STORE_DB_URL ="credential.store.jdbc.url";
-    private static final String CREDENTIAL_STORE_DB_USER ="credential.store.jdbc.user";
-    private static final String CREDENTIAL_STORE_DB_PASSWORD ="credential.store.jdbc.password";
-    private static final String CREDENTIAL_STORE_DB_DRIVER ="credential.store.jdbc.driver";
+    private static final String DEFAULT_USER = "default.registry.user";
+    private static final String DEFAULT_USER_PASSWORD = "default.registry.password";
+    private static final String DEFAULT_USER_GATEWAY = "default.registry.gateway";
 
-    private static final String REGISTRY_DB_URL ="registry.jdbc.url";
-    private static final String REGISTRY_DB_USER ="registry.jdbc.user";
-    private static final String REGISTRY_DB_PASSWORD ="registry.jdbc.password";
-    private static final String REGISTRY_DB_DRIVER ="registry.jdbc.driver";
+    private static final String TOMCAT_PORT = "port";
+    private static final String SERVER_CONTEXT_ROOT = "server.context-root";
+    public static final String EMBEDDED_ZK = "embedded.zk";
+    public static final String IP = "ip";
+    private static String tomcatPort = null;
+
+    private static final String CREDENTIAL_STORE_DB_URL = "credential.store.jdbc.url";
+    private static final String CREDENTIAL_STORE_DB_USER = "credential.store.jdbc.user";
+    private static final String CREDENTIAL_STORE_DB_PASSWORD = "credential.store.jdbc.password";
+    private static final String CREDENTIAL_STORE_DB_DRIVER = "credential.store.jdbc.driver";
+
+    private static final String REGISTRY_DB_URL = "registry.jdbc.url";
+    private static final String REGISTRY_DB_USER = "registry.jdbc.user";
+    private static final String REGISTRY_DB_PASSWORD = "registry.jdbc.password";
+    private static final String REGISTRY_DB_DRIVER = "registry.jdbc.driver";
     private static final String ENABLE_HTTPS = "enable.https";
     private static final String HOST_SCHEDULER = "host.scheduler";
     private static final String MY_PROXY_SERVER = "myproxy.server";
@@ -70,32 +73,32 @@ public class ServerSettings extends ApplicationSettings{
 
     private static boolean stopAllThreads = false;
 
-    public static String getDefaultUser() throws ApplicationSettingsException{
-    	return getSetting(DEFAULT_USER);
+    public static String getDefaultUser() throws ApplicationSettingsException {
+        return getSetting(DEFAULT_USER);
     }
-    
-    public static String getDefaultUserPassword() throws ApplicationSettingsException{
-    	return getSetting(DEFAULT_USER_PASSWORD);
+
+    public static String getDefaultUserPassword() throws ApplicationSettingsException {
+        return getSetting(DEFAULT_USER_PASSWORD);
     }
-    
-    public static String getDefaultUserGateway() throws ApplicationSettingsException{
-    	return getSetting(DEFAULT_USER_GATEWAY);
+
+    public static String getDefaultUserGateway() throws ApplicationSettingsException {
+        return getSetting(DEFAULT_USER_GATEWAY);
     }
-    
-    public static String getSystemUser() throws ApplicationSettingsException{
-    	return getSetting(SYSTEM_USER);
+
+    public static String getSystemUser() throws ApplicationSettingsException {
+        return getSetting(SYSTEM_USER);
     }
-    
-    public static String getSystemUserPassword() throws ApplicationSettingsException{
-    	return getSetting(SYSTEM_USER_PASSWORD);
+
+    public static String getSystemUserPassword() throws ApplicationSettingsException {
+        return getSetting(SYSTEM_USER_PASSWORD);
     }
-    
-    public static String getSystemUserGateway() throws ApplicationSettingsException{
-    	return getSetting(SYSTEM_USER_GATEWAY);
+
+    public static String getSystemUserGateway() throws ApplicationSettingsException {
+        return getSetting(SYSTEM_USER_GATEWAY);
     }
 
-    public static String getServerContextRoot(){
-    	return getSetting(SERVER_CONTEXT_ROOT,"axis2");
+    public static String getServerContextRoot() {
+        return getSetting(SERVER_CONTEXT_ROOT, "axis2");
     }
 
     public static String getCredentialStoreDBUser() throws ApplicationSettingsException {
@@ -140,57 +143,57 @@ public class ServerSettings extends ApplicationSettings{
     }
 
     public static String getTomcatPort(String protocol) throws ApplicationSettingsException {
-    	if (tomcatPort==null) {
-			try {
-				//First try to get the port from a tomcat if it is already running
-				ArrayList<MBeanServer> mBeanServers = MBeanServerFactory
-						.findMBeanServer(null);
-				if (mBeanServers.size() > 0) {
-					MBeanServer mBeanServer = mBeanServers.get(0);
-					Server server = null;
-					String[] domains = mBeanServer.getDomains();
-					for (String domain : domains) {
-						try {
-							server = (Server) mBeanServer.getAttribute(
-									new ObjectName(domain, "type", "Server"),
-									"managedResource");
-							break;
-						} catch (InstanceNotFoundException e) {
-							//ignore
-						}
-					}
-					if (server != null) {
-						Service[] findServices = server.findServices();
-						for (Service service : findServices) {
-							for (Connector connector : service.findConnectors()) {
-								ProtocolHandler protocolHandler = connector.getProtocolHandler();
-                                if(protocol != null && protocol.equals(connector.getScheme())){
-								if (protocolHandler instanceof Http11Protocol
-										|| protocolHandler instanceof Http11AprProtocol
-										|| protocolHandler instanceof Http11NioProtocol) {
-									Http11Protocol p = (Http11Protocol) protocolHandler;
-									if (p.getSslImplementationName() == null
-											|| p.getSslImplementationName()
-													.equals("")) {
-										tomcatPort = String.valueOf(connector
-												.getPort());
-									}
+        if (tomcatPort == null) {
+            try {
+                //First try to get the port from a tomcat if it is already running
+                ArrayList<MBeanServer> mBeanServers = MBeanServerFactory
+                        .findMBeanServer(null);
+                if (mBeanServers.size() > 0) {
+                    MBeanServer mBeanServer = mBeanServers.get(0);
+                    Server server = null;
+                    String[] domains = mBeanServer.getDomains();
+                    for (String domain : domains) {
+                        try {
+                            server = (Server) mBeanServer.getAttribute(
+                                    new ObjectName(domain, "type", "Server"),
+                                    "managedResource");
+                            break;
+                        } catch (InstanceNotFoundException e) {
+                            //ignore
+                        }
+                    }
+                    if (server != null) {
+                        Service[] findServices = server.findServices();
+                        for (Service service : findServices) {
+                            for (Connector connector : service.findConnectors()) {
+                                ProtocolHandler protocolHandler = connector.getProtocolHandler();
+                                if (protocol != null && protocol.equals(connector.getScheme())) {
+                                    if (protocolHandler instanceof Http11Protocol
+                                            || protocolHandler instanceof Http11AprProtocol
+                                            || protocolHandler instanceof Http11NioProtocol) {
+                                        Http11Protocol p = (Http11Protocol) protocolHandler;
+                                        if (p.getSslImplementationName() == null
+                                                || p.getSslImplementationName()
+                                                .equals("")) {
+                                            tomcatPort = String.valueOf(connector
+                                                    .getPort());
+                                        }
+                                    }
                                 }
-								}
-							}
-						}
-					}
-				}
-			} catch (Exception e) {
-				e.printStackTrace();
-			}
-			//if unable to determine the server port from a running tomcat server, get it from 
-			//the server settings file
-			if (tomcatPort == null) {
-				tomcatPort = getSetting(TOMCAT_PORT);
-			}
-		}
-		return tomcatPort;
+                            }
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+            //if unable to determine the server port from a running tomcat server, get it from
+            //the server settings file
+            if (tomcatPort == null) {
+                tomcatPort = getSetting(TOMCAT_PORT);
+            }
+        }
+        return tomcatPort;
     }
 
     public static String getHostScheduler() throws ApplicationSettingsException {
@@ -204,22 +207,41 @@ public class ServerSettings extends ApplicationSettings{
     public static void setStopAllThreads(boolean stopAllThreads) {
         ServerSettings.stopAllThreads = stopAllThreads;
     }
+
     public static String getMyProxyServer() throws ApplicationSettingsException {
         return getSetting(MY_PROXY_SERVER);
     }
+
     public static String getMyProxyUser() throws ApplicationSettingsException {
         return getSetting(MY_PROXY_USER);
     }
+
     public static String getMyProxyPassword() throws ApplicationSettingsException {
         return getSetting(MY_PROXY_PASSWORD);
     }
+
     public static int getMyProxyLifetime() throws ApplicationSettingsException {
         return Integer.parseInt(getSetting(MY_PROXY_LIFETIME));
     }
+
     public static String[] getActivityListeners() throws ApplicationSettingsException {
         return getSetting(ACTIVITY_LISTENERS).split(",");
     }
+
     public static boolean isEmbeddedZK() {
         return Boolean.parseBoolean(getSetting(EMBEDDED_ZK, "true"));
     }
+
+    public static String getIp() {
+        try {
+            return getSetting(IP);
+        } catch (ApplicationSettingsException e) {
+            try {
+                return InetAddress.getLocalHost().getHostAddress();
+            } catch (UnknownHostException e1) {
+                e1.printStackTrace();
+            }
+        }
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/modules/configuration/server/src/main/resources/PBSTemplate.xslt
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/PBSTemplate.xslt b/modules/configuration/server/src/main/resources/PBSTemplate.xslt
index a221ce2..3306215 100644
--- a/modules/configuration/server/src/main/resources/PBSTemplate.xslt
+++ b/modules/configuration/server/src/main/resources/PBSTemplate.xslt
@@ -74,7 +74,7 @@ cd <xsl:text>   </xsl:text><xsl:value-of select="ns:workingDirectory"/><xsl:text
 <xsl:for-each select="ns:postJobCommands/ns:command">
       <xsl:value-of select="."/><xsl:text>   </xsl:text>
 </xsl:for-each>
-
+~/rabbitmq-java-client-bin-3.3.5/runjava.sh com.rabbitmq.examples.SimpleProducer amqp://<xsl:value-of select="ns:callBackIp"/> <xsl:value-of select="ns:userName"/>,<xsl:value-of select="ns:jobName"/>
 </xsl:template>
 
 </xsl:stylesheet>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/modules/configuration/server/src/main/resources/SLURMTemplate.xslt
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/SLURMTemplate.xslt b/modules/configuration/server/src/main/resources/SLURMTemplate.xslt
index 4597476..c09b35d 100644
--- a/modules/configuration/server/src/main/resources/SLURMTemplate.xslt
+++ b/modules/configuration/server/src/main/resources/SLURMTemplate.xslt
@@ -69,6 +69,7 @@ cd <xsl:text>   </xsl:text><xsl:value-of select="ns:workingDirectory"/><xsl:text
 <xsl:for-each select="ns:inputs/ns:input">
       <xsl:value-of select="."/><xsl:text>   </xsl:text>
     </xsl:for-each>
+~/rabbitmq-java-client-bin-3.3.5/runjava.sh com.rabbitmq.examples.SimpleProducer amqp://<xsl:value-of select="ns:callBackIp"/> <xsl:value-of select="ns:userName"/>,<xsl:value-of select="ns:jobName"/>
 </xsl:template>
 
 </xsl:stylesheet>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index 10c0a51..b1152c4 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -34,9 +34,8 @@
 
 port=8080
 
-# Axis2 server automatically picks up IP address from axis configuration,
-#  but some DHCP enables machines do not report correct ip addresses,
-#  in which case, the IP address can be manually specified.
+#This property will be useful when there are multiple network interfaces in the machine where airavata is
+#deployed, so users have to specify the ip address manually and this can be use for callback ip of the system(specially in gfac).
 
 #ip=192.2.33.12
 
@@ -187,7 +186,10 @@ amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
 proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
 connection.name=xsede
 activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher
+#This property will be useful when there are multiple network interfaces in the machine where airavata is
+#deployed, so users have to specify the ip address manually and this can be use for callback ip of the system(specially in gfac).
 
+#ip=192.2.33.12
 ###---------------------------Orchestrator module Configurations---------------------------###
 #job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
 job.submitter=org.apache.airavata.orchestrator.core.impl.GFACServiceJobSubmitter

http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
index 563db94..2270db4 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
@@ -193,12 +193,12 @@ public class MonitorID {
             if (getFailedCount() >= 2) {
                 switch (this.state) {
                     case ACTIVE:
-                        this.state = JobState.COMPLETE;
+//                        this.state = JobState.COMPLETE;
                         logger.info("Failed count is high and old status is ACTIVE so we mark this as COMPLETE");
                         break;
                     case QUEUED:
                         logger.info("Failed count is high and old status is QUEUED so we mark this as COMPLETE");
-                        this.state = JobState.COMPLETE;
+//                        this.state = JobState.COMPLETE;
                         break;
                     default:
                         int loginfo = getFailedCount()+1;

http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
index 5de902f..9e45986 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
@@ -42,6 +42,7 @@ import org.apache.airavata.gsi.ssh.api.Cluster;
 import org.apache.airavata.gsi.ssh.api.ServerInfo;
 import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
 import org.apache.airavata.gsi.ssh.api.job.JobManagerConfiguration;
+import org.apache.airavata.gsi.ssh.impl.GSISSHAbstractCluster;
 import org.apache.airavata.gsi.ssh.impl.PBSCluster;
 import org.apache.airavata.gsi.ssh.util.CommonUtils;
 import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
@@ -112,6 +113,7 @@ public class GFACGSISSHUtils {
                                                     ApplicationDeploymentDescriptionType app, Cluster cluster) {
         JobDescriptor jobDescriptor = new JobDescriptor();
         // this is common for any application descriptor
+        jobDescriptor.setCallBackIp(ServerSettings.getIp());
         jobDescriptor.setInputDirectory(app.getInputDataDirectory());
         jobDescriptor.setOutputDirectory(app.getOutputDataDirectory());
         jobDescriptor.setExecutablePath(app.getExecutableLocation());
@@ -151,6 +153,7 @@ public class GFACGSISSHUtils {
         if (app instanceof HpcApplicationDeploymentType) {
             HpcApplicationDeploymentType applicationDeploymentType
                     = (HpcApplicationDeploymentType) app;
+            jobDescriptor.setUserName(((GSISSHAbstractCluster)cluster).getServerInfo().getUserName());
             jobDescriptor.setShellName("/bin/bash");
             jobDescriptor.setAllEnvExport(true);
             jobDescriptor.setMailOptions("n");

http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 93e1aa9..35cad83 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -44,6 +44,7 @@ import org.apache.airavata.gfac.monitor.HostMonitorData;
 import org.apache.airavata.gfac.monitor.UserMonitorData;
 import org.apache.airavata.gfac.monitor.core.PullMonitor;
 import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.gfac.monitor.impl.push.amqp.SimpleJobFinishConsumer;
 import org.apache.airavata.gfac.monitor.util.CommonUtils;
 import org.apache.airavata.gsi.ssh.api.SSHApiException;
 import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
@@ -63,6 +64,7 @@ import com.google.common.eventbus.EventBus;
  */
 public class HPCPullMonitor extends PullMonitor {
     private final static Logger logger = LoggerFactory.getLogger(HPCPullMonitor.class);
+    public static final int FAILED_COUNT = 5;
 
     // I think this should use DelayedBlocking Queue to do the monitoring*/
     private BlockingQueue<UserMonitorData> queue;
@@ -75,6 +77,7 @@ public class HPCPullMonitor extends PullMonitor {
 
     private LinkedBlockingQueue<String> cancelJobList;
 
+    private List<String> completedJobsFromPush;
 
     private GFac gfac;
 
@@ -82,17 +85,21 @@ public class HPCPullMonitor extends PullMonitor {
 
     public HPCPullMonitor() {
         connections = new HashMap<String, ResourceConnection>();
-        this.queue = new LinkedBlockingDeque<UserMonitorData>();
+        queue = new LinkedBlockingDeque<UserMonitorData>();
         publisher = new MonitorPublisher(new EventBus());
         cancelJobList = new LinkedBlockingQueue<String>();
+        completedJobsFromPush = new ArrayList<String>();
+        (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen();
     }
 
     public HPCPullMonitor(MonitorPublisher monitorPublisher, AuthenticationInfo authInfo) {
         connections = new HashMap<String, ResourceConnection>();
-        this.queue = new LinkedBlockingDeque<UserMonitorData>();
+        queue = new LinkedBlockingDeque<UserMonitorData>();
         publisher = monitorPublisher;
         authenticationInfo = authInfo;
         cancelJobList = new LinkedBlockingQueue<String>();
+        this.completedJobsFromPush = new ArrayList<String>();
+        (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen();
     }
 
     public HPCPullMonitor(BlockingQueue<UserMonitorData> queue, MonitorPublisher publisher) {
@@ -100,6 +107,8 @@ public class HPCPullMonitor extends PullMonitor {
         this.publisher = publisher;
         connections = new HashMap<String, ResourceConnection>();
         cancelJobList = new LinkedBlockingQueue<String>();
+        this.completedJobsFromPush = new ArrayList<String>();
+        (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen();
     }
 
 
@@ -171,24 +180,41 @@ public class HPCPullMonitor extends PullMonitor {
                         connection = new ResourceConnection(iHostMonitorData,getAuthenticationInfo());
                         connections.put(hostName, connection);
                     }
-                    // before we get the statuses, we check the cancel job list and remove them permanently
 
+                    // before we get the statuses, we check the cancel job list and remove them permanently
                     List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs();
+                    Iterator<String> iterator1 = cancelJobList.iterator();
+
                     for(MonitorID iMonitorID:monitorID){
-                        for(String cancelMId:cancelJobList) {
+                        while(iterator1.hasNext()) {
+                            String cancelMId = iterator1.next();
                             if (cancelMId.equals(iMonitorID.getExperimentID() + "+" + iMonitorID.getTaskID())) {
                                 logger.info("Found a match in monitoring Queue, so marking this job to remove from monitor queue " + cancelMId);
                                 logger.info("ExperimentID: " + cancelMId.split("\\+")[0] + ",TaskID: " + cancelMId.split("\\+")[1] + "JobID" + iMonitorID.getJobID());
                                 completedJobs.add(iMonitorID);
                                 iMonitorID.setStatus(JobState.CANCELED);
+                                iterator1.remove();
+                            }
+                        }
+                    }
+                    Iterator<String> iterator = completedJobsFromPush.iterator();
+                    for(MonitorID iMonitorID:monitorID){
+                        while(iterator.hasNext()) {
+                            String cancelMId = iterator.next();
+                            if (cancelMId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName())) {
+                                logger.info("This job is finished because push notification came with <username,jobName> " + cancelMId);
+                                completedJobs.add(iMonitorID);
+                                iterator.remove();
+                                iMonitorID.setStatus(JobState.COMPLETE);
                             }
                         }
                     }
                     Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID);
                     for (MonitorID iMonitorID : monitorID) {
                         currentMonitorID = iMonitorID;
-                        if (!JobState.CANCELED.equals(iMonitorID.getStatus())) {
-                            iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName()));    //IMPORTANT this is not a simple setter we have a logic
+                        if (!JobState.CANCELED.equals(iMonitorID.getStatus())&&
+                                !JobState.COMPLETE.equals(iMonitorID.getStatus())) {
+                            iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName()));    //IMPORTANT this is NOT a simple setter we have a logic
                         }
                             jobStatus = new JobStatusChangeRequest(iMonitorID);
                             // we have this JobStatus class to handle amqp monitoring
@@ -211,7 +237,7 @@ public class HPCPullMonitor extends PullMonitor {
 //										ExperimentState.FAILED));
                                     logger.info(e.getLocalizedMessage(), e);
                                 }
-                            } else if (iMonitorID.getFailedCount() > 2) {
+                            } else if (iMonitorID.getFailedCount() > FAILED_COUNT) {
                                 logger.error("Tried to monitor the job with ID " + iMonitorID.getJobID() + " But failed 3 times, so skip this Job from Monitor");
                                 iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
                                 completedJobs.add(iMonitorID);

http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java
new file mode 100644
index 0000000..3d62fc0
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl.push.amqp;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.QueueingConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class SimpleJobFinishConsumer {
+    private final static Logger logger = LoggerFactory.getLogger(SimpleJobFinishConsumer.class);
+
+    private List<String> completedJobsFromPush;
+
+    public SimpleJobFinishConsumer(List<String> completedJobsFromPush) {
+        this.completedJobsFromPush = completedJobsFromPush;
+    }
+
+    public void listen() {
+        try {
+            String uri = "amqp://localhost";
+            String queueName = "SimpleQueue";
+
+            ConnectionFactory connFactory = new ConnectionFactory();
+            connFactory.setUri(uri);
+            Connection conn = connFactory.newConnection();
+
+            final Channel ch = conn.createChannel();
+
+            ch.queueDeclare(queueName, false, false, false, null);
+
+            final QueueingConsumer consumer = new QueueingConsumer(ch);
+            ch.basicConsume(queueName, consumer);
+            (new Thread() {
+                public void run() {
+                    try {
+                        while (true) {
+                            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
+                            System.out.println(new String(delivery.getBody()));
+                            completedJobsFromPush.add(new String(delivery.getBody()));
+                            ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
+                        }
+                    } catch (Exception ex) {
+                        logger.error("Cannot connect to a RabbitMQ Server: " + ex);
+                    }
+                }
+
+            }).start();
+        } catch (Exception ex) {
+            logger.error("Cannot connect to a RabbitMQ Server: " + ex);
+            logger.info("------------- Push monitoring for HPC jobs is disabled -------------");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
index f8e4097..6db4550 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
@@ -132,7 +132,7 @@ public class CommonUtils {
                 // then this is the right place to update
                 List<HostMonitorData> hostMonitorData = next.getHostMonitorData();
                 for(HostMonitorData iHostMonitorID:hostMonitorData){
-                    if(iHostMonitorID.getHost().equals(monitorID.getHost())) {
+                    if(iHostMonitorID.getHost().toXML().equals(monitorID.getHost().toXML())) {
                         List<MonitorID> monitorIDs = iHostMonitorID.getMonitorIDs();
                         for(MonitorID iMonitorID:monitorIDs){
                             if(iMonitorID.getJobID().equals(monitorID.getJobID())
@@ -155,9 +155,9 @@ public class CommonUtils {
                 }
             }
         }
-        throw new AiravataMonitorException("Cannot find the given MonitorID in the queue with userName " +
+        logger.error("Cannot find the given MonitorID in the queue with userName " +
                 monitorID.getUserName() + "  and jobID " + monitorID.getJobID());
-
+        logger.info("This might not be an error because someone else removed this job from the queue");
     }
 
     public static boolean isEqual(HostDescription host1,HostDescription host2) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
index ab1896e..75bd724 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
@@ -40,6 +40,7 @@ import org.apache.airavata.gsi.ssh.api.SSHApiException;
 import org.apache.airavata.gsi.ssh.api.ServerInfo;
 import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
 import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.impl.GSISSHAbstractCluster;
 import org.apache.airavata.gsi.ssh.impl.PBSCluster;
 import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
 import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
@@ -94,6 +95,7 @@ public class GFACSSHUtils {
                                                     ApplicationDeploymentDescriptionType app, Cluster cluster) {
         JobDescriptor jobDescriptor = new JobDescriptor();
         // this is common for any application descriptor
+        jobDescriptor.setCallBackIp(ServerSettings.getIp());
         jobDescriptor.setInputDirectory(app.getInputDataDirectory());
         jobDescriptor.setOutputDirectory(app.getOutputDataDirectory());
         jobDescriptor.setExecutablePath(app.getExecutableLocation());
@@ -134,6 +136,7 @@ public class GFACSSHUtils {
         if (app instanceof HpcApplicationDeploymentType) {
             HpcApplicationDeploymentType applicationDeploymentType
                     = (HpcApplicationDeploymentType) app;
+            jobDescriptor.setUserName(((GSISSHAbstractCluster) cluster).getServerInfo().getUserName());
             jobDescriptor.setShellName("/bin/bash");
             jobDescriptor.setAllEnvExport(true);
             jobDescriptor.setMailOptions("n");

http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobDescriptor.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobDescriptor.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobDescriptor.java
index 529b504..3890a09 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobDescriptor.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobDescriptor.java
@@ -395,5 +395,12 @@ public class JobDescriptor {
         return this.getJobDescriptorDocument().getJobDescriptor().getUserName();
     }
 
+    public void setCallBackIp(String ip){
+        this.jobDescriptionDocument.getJobDescriptor().setCallBackIp(ip);
+    }
+
+    public String getCallBackIp(){
+        return this.jobDescriptionDocument.getJobDescriptor().getCallBackIp();
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java
index 2fbbd0e..e99863b 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java
@@ -149,11 +149,11 @@ public class SlurmOutputParser implements OutputParser {
     }
 
     public void parse(String userName, Map<String, JobStatus> statusMap, String rawOutput) throws SSHApiException {
-        log.info(rawOutput);
+        log.debug(rawOutput);
         String[] info = rawOutput.split("\n");
         String lastString = info[info.length -1];
         if (lastString.contains("JOBID") || lastString.contains("PARTITION")) {
-            // There are no jobs for this username
+            log.info("There are no jobs with this username ... ");
             return;
         }
         int lastStop = 0;

http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java
index bc0231e..0ec9992 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java
@@ -51,14 +51,14 @@ public class StandardOutReader implements CommandOutput {
                     pbsOutput.append(new String(tmp, 0, i));
                 }
                 if (channel.isClosed()) {
-                    String output = pbsOutput.toString();
-                    this.setStdOutputString(output);
                     break;
                 }
                 try {
                 } catch (Exception ignored) {
                 }
             }
+            String output = pbsOutput.toString();
+            this.setStdOutputString(output);
         } catch (IOException e) {
             e.printStackTrace();
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/SystemCommandOutput.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/SystemCommandOutput.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/SystemCommandOutput.java
index a6021f7..6e3b8e6 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/SystemCommandOutput.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/SystemCommandOutput.java
@@ -39,6 +39,7 @@ public class SystemCommandOutput implements CommandOutput {
     public void onOutput(Channel channel) {
         try {
             InputStream inputStream = channel.getInputStream();
+
             byte[] tmp = new byte[1024];
             while (true) {
                 while (inputStream.available() > 0) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/68e81ef8/tools/gsissh/src/main/resources/schemas/PBSJobDescriptor.xsd
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/resources/schemas/PBSJobDescriptor.xsd b/tools/gsissh/src/main/resources/schemas/PBSJobDescriptor.xsd
index be1a374..a9917bc 100644
--- a/tools/gsissh/src/main/resources/schemas/PBSJobDescriptor.xsd
+++ b/tools/gsissh/src/main/resources/schemas/PBSJobDescriptor.xsd
@@ -56,6 +56,7 @@
             <element name="preJobCommands" type="gsissh:preJobCommands" minOccurs="0" maxOccurs="1"/>
             <element name="postJobCommands" type="gsissh:postJobCommands" minOccurs="0" maxOccurs="1"/>
             <element name="jobSubmitterCommand" type="xsd:string" minOccurs="0" maxOccurs="1"/>
+            <element name="callBackIp" type="xsd:string" minOccurs="0" maxOccurs="1"/>
 		</sequence>
 	</complexType>