You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/04/23 22:47:35 UTC

git commit: fixing monitoring issue with async submission

Repository: airavata
Updated Branches:
  refs/heads/master b02308493 -> 34a8147e4


fixing monitoring issue with async submission


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/34a8147e
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/34a8147e
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/34a8147e

Branch: refs/heads/master
Commit: 34a8147e4a6980d6fbbc59f7a8440d6797da9e1a
Parents: b023084
Author: lahiru <la...@apache.org>
Authored: Wed Apr 23 16:47:21 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Wed Apr 23 16:47:21 2014 -0400

----------------------------------------------------------------------
 modules/distribution/server/pom.xml             |   5 +
 .../server/src/main/assembly/bin-assembly.xml   |   1 +
 .../java/org/apache/airavata/gfac/cpi/GFac.java |   5 +-
 .../org/apache/airavata/gfac/cpi/GFacImpl.java  |  34 +--
 .../apache/airavata/gfac/monitor/MonitorID.java |  13 +-
 .../airavata/gfac/handler/SCPOutputHandler.java |  98 +++++---
 .../core/impl/EmbeddedGFACJobSubmitter.java     |   8 +-
 .../orchestrator/core/job/JobSubmitter.java     |  10 +-
 .../cpi/impl/SimpleOrchestratorImpl.java        |  17 +-
 .../apache/airavata/job/monitor/MonitorID.java  | 221 -------------------
 tools/pom.xml                                   |   1 -
 11 files changed, 121 insertions(+), 292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/distribution/server/pom.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/pom.xml b/modules/distribution/server/pom.xml
index 0ff7f4e..ad6d07c 100644
--- a/modules/distribution/server/pom.xml
+++ b/modules/distribution/server/pom.xml
@@ -286,6 +286,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-gfac-ssh</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
             <artifactId>airavata-gfac-core</artifactId>
             <version>${project.version}</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/distribution/server/src/main/assembly/bin-assembly.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/src/main/assembly/bin-assembly.xml b/modules/distribution/server/src/main/assembly/bin-assembly.xml
index 05f4475..4a76cd2 100644
--- a/modules/distribution/server/src/main/assembly/bin-assembly.xml
+++ b/modules/distribution/server/src/main/assembly/bin-assembly.xml
@@ -196,6 +196,7 @@
                 <include>org.apache.airavata:airavata-data-models:jar</include>
                 <include>org.apache.airavata:airavata-credential-store:jar</include>
                 <include>org.apache.airavata:airavata-gfac-core:jar</include>
+                <include>org.apache.airavata:airavata-gfac-ssh:jar</include>
                 <include>org.apache.airavata:airavata-client-api:jar</include>
                 <include>org.apache.airavata:airavata-message-monitor:jar</include>
                 <include>org.apache.airavata:airavata-workflow-model-core:jar</include>

http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFac.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFac.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFac.java
index d18df07..fc5fd19 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFac.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFac.java
@@ -41,10 +41,9 @@ public interface GFac {
 
     /**
      *  This method has to be invoked after submitting the job and have to make sure job is properly finished
-     * @param experimentID
-     * @param taskID
+     * @param jobExecutionContext
      * @throws GFacException
      */
-    public void invokeOutFlowHandlers(String experimentID,String taskID) throws GFacException;
+    public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException;
 
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
index 14ea519..6312292 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
@@ -309,30 +309,20 @@ public class GFacImpl implements GFac, AbstractActivityListener {
         }
     }
 
-    public void invokeOutFlowHandlers(String experimentID,String taskID) throws GFacException {
-        JobExecutionContext jobExecutionContext = null;
-        try {
-            jobExecutionContext = createJEC(experimentID, taskID);
-            Scheduler.schedule(jobExecutionContext);
-            ApplicationDescription applicationDeploymentDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
-            TaskDetails taskData = (TaskDetails) registry.get(DataType.TASK_DETAIL, taskID);
-            JobDetails jobDetails = taskData.getJobDetailsList().get(0);
-            String jobDescription = jobDetails.getJobDescription();
-            if(jobDescription != null) {
-                JobDescriptor jobDescriptor = JobDescriptor.fromXML(jobDescription);
-                applicationDeploymentDescription.getType().setScratchWorkingDirectory(
-                        jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getWorkingDirectory());
-                applicationDeploymentDescription.getType().setInputDataDirectory(jobDescriptor.getInputDirectory());
-                applicationDeploymentDescription.getType().setOutputDataDirectory(jobDescriptor.getOutputDirectory());
-                applicationDeploymentDescription.getType().setStandardError(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardErrorFile());
-                applicationDeploymentDescription.getType().setStandardOutput(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardOutFile());
-            }
+    public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+        GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
+        List<GFacHandlerConfig> handlers = null;
+        if(gFacConfiguration != null){
+         handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
+        }else {
+            try {
+                jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
             } catch (Exception e) {
-            throw new GFacException(e);
+                log.error("Error constructing job execution context during outhandler invocation");
+                throw new GFacException(e);
+            }
+            schedule(jobExecutionContext);
         }
-
-        List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
-
         for (GFacHandlerConfig handlerClassName : handlers) {
             Class<? extends GFacHandler> handlerClass;
             GFacHandler handler;

http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java
index c022bef..a01dcba 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java
@@ -21,6 +21,7 @@
 package org.apache.airavata.gfac.monitor;
 
 import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gfac.context.JobExecutionContext;
 import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
 import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
 import org.apache.airavata.model.workspace.experiment.JobState;
@@ -62,6 +63,8 @@ public class MonitorID {
 
     private JobState state;
 
+    private JobExecutionContext jobExecutionContext;
+
     public MonitorID(HostDescription host, String jobID,String taskID, String workflowNodeID, String experimentID, String userName) {
         this.host = host;
         this.jobStartedTime = new Timestamp((new Date()).getTime());
@@ -215,7 +218,15 @@ public class MonitorID {
 		this.workflowNodeID = workflowNodeID;
 	}
 
-//	public String getWorkflowNodeID() {
+    public JobExecutionContext getJobExecutionContext() {
+        return jobExecutionContext;
+    }
+
+    public void setJobExecutionContext(JobExecutionContext jobExecutionContext) {
+        this.jobExecutionContext = jobExecutionContext;
+    }
+
+    //	public String getWorkflowNodeID() {
 //		return workflowNodeID;
 //	}
 //

http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
index 0b6619e..789d188 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
@@ -33,7 +33,9 @@ import net.schmizz.sshj.transport.TransportException;
 
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.ApplicationDescription;
 import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.Scheduler;
 import org.apache.airavata.gfac.context.JobExecutionContext;
 import org.apache.airavata.gfac.context.security.GSISecurityContext;
 import org.apache.airavata.gfac.context.security.SSHSecurityContext;
@@ -42,10 +44,13 @@ import org.apache.airavata.gfac.util.GFACSSHUtils;
 import org.apache.airavata.gfac.utils.GFacUtils;
 import org.apache.airavata.gfac.utils.OutputUtils;
 import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
 import org.apache.airavata.gsi.ssh.util.SSHUtils;
 import org.apache.airavata.model.workspace.experiment.*;
 import org.apache.airavata.persistance.registry.jpa.model.DataTransferDetail;
 import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.registry.cpi.DataType;
+import org.apache.airavata.registry.cpi.RegistryException;
 import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
 import org.apache.airavata.schemas.gfac.URIParameterType;
 import org.apache.xmlbeans.XmlException;
@@ -56,7 +61,36 @@ public class SCPOutputHandler extends AbstractHandler{
     private static final Logger log = LoggerFactory.getLogger(SCPOutputHandler.class);
 
     public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException, GFacException {
-         if(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null){
+        if(jobExecutionContext.getTaskData().getJobDetailsListSize() == 0) { // this is because we don't have the right jobexecution context
+            // so attempting to get it from the registry
+            log.warn("During the out handler chain jobExecution context came null, so trying to handler");
+            ApplicationDescription applicationDeploymentDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
+            TaskDetails taskData = null;
+            try {
+                taskData = (TaskDetails) registry.get(DataType.TASK_DETAIL, jobExecutionContext.getTaskData().getTaskID());
+            } catch (RegistryException e) {
+                log.error("Error retrieving job details from Registry");
+                throw new GFacHandlerException("Error retrieving job details from Registry", e);
+            }
+            JobDetails jobDetails = taskData.getJobDetailsList().get(0);
+            String jobDescription = jobDetails.getJobDescription();
+            if (jobDescription != null) {
+                JobDescriptor jobDescriptor = null;
+                try {
+                    jobDescriptor = JobDescriptor.fromXML(jobDescription);
+                } catch (XmlException e1) {
+                    e1.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                }
+                applicationDeploymentDescription.getType().setScratchWorkingDirectory(
+                        jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getWorkingDirectory());
+                applicationDeploymentDescription.getType().setInputDataDirectory(jobDescriptor.getInputDirectory());
+                applicationDeploymentDescription.getType().setOutputDataDirectory(jobDescriptor.getOutputDirectory());
+                applicationDeploymentDescription.getType().setStandardError(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardErrorFile());
+                applicationDeploymentDescription.getType().setStandardOutput(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardOutFile());
+            }
+        }
+
+        if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) {
             try {
                 GFACSSHUtils.addSecurityContext(jobExecutionContext);
             } catch (ApplicationSettingsException e) {
@@ -64,11 +98,11 @@ public class SCPOutputHandler extends AbstractHandler{
                 throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
             }
         }
-    	super.invoke(jobExecutionContext);
+        super.invoke(jobExecutionContext);
         DataTransferDetails detail = new DataTransferDetails();
         TransferStatus status = new TransferStatus();
 
-    	ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
+        ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
                 .getApplicationDeploymentDescription().getType();
         try {
             Cluster cluster = null;
@@ -94,10 +128,10 @@ public class SCPOutputHandler extends AbstractHandler{
             if (taskData.getAdvancedOutputDataHandling() != null) {
                 outputDataDir = taskData.getAdvancedOutputDataHandling().getOutputDataDir();
             }
-            if(outputDataDir == null) {
+            if (outputDataDir == null) {
                 outputDataDir = File.separator + "tmp";
             }
-            outputDataDir = outputDataDir + File.separator + jobExecutionContext.getExperimentID() + "-" +jobExecutionContext.getTaskData().getTaskID();
+            outputDataDir = outputDataDir + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID();
             (new File(outputDataDir)).mkdirs();
 
 
@@ -114,35 +148,35 @@ public class SCPOutputHandler extends AbstractHandler{
             status.setTransferState(TransferState.COMPLETE);
             detail.setTransferStatus(status);
             detail.setTransferDescription("STDOUT:" + stdOutStr);
-            registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+            registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
 
             status.setTransferState(TransferState.COMPLETE);
             detail.setTransferStatus(status);
             detail.setTransferDescription("STDERR:" + stdErrStr);
-            registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+            registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
 
 
             Map<String, ActualParameter> stringMap = new HashMap<String, ActualParameter>();
             Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
             Set<String> keys = output.keySet();
             for (String paramName : keys) {
-            ActualParameter actualParameter = (ActualParameter) output.get(paramName);
-            if ("URI".equals(actualParameter.getType().getType().toString())) {
-
-                List<String> outputList = cluster.listDirectory(app.getOutputDataDirectory());
-                if (outputList.size() == 0 || outputList.get(0).isEmpty()) {
-                    stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
+                ActualParameter actualParameter = (ActualParameter) output.get(paramName);
+                if ("URI".equals(actualParameter.getType().getType().toString())) {
+
+                    List<String> outputList = cluster.listDirectory(app.getOutputDataDirectory());
+                    if (outputList.size() == 0 || outputList.get(0).isEmpty()) {
+                        stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
+                    } else {
+                        String valueList = outputList.get(0);
+                        cluster.scpFrom(app.getOutputDataDirectory() + File.separator + valueList, outputDataDir);
+                        jobExecutionContext.addOutputFile(outputDataDir + File.separator + valueList);
+                        ((URIParameterType) actualParameter.getType()).setValue(valueList);
+                        stringMap = new HashMap<String, ActualParameter>();
+                        stringMap.put(paramName, actualParameter);
+                    }
                 } else {
-					String valueList = outputList.get(0);
-                    cluster.scpFrom(app.getOutputDataDirectory() + File.separator + valueList, outputDataDir);
-                    jobExecutionContext.addOutputFile(outputDataDir + File.separator + valueList);
-					((URIParameterType) actualParameter.getType()).setValue(valueList);
-					stringMap = new HashMap<String, ActualParameter>();
-					stringMap.put(paramName, actualParameter);
-				}
-			}else{
-				 stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
-			}
+                    stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
+                }
             }
             if (stringMap == null || stringMap.isEmpty()) {
                 throw new GFacHandlerException(
@@ -151,7 +185,7 @@ public class SCPOutputHandler extends AbstractHandler{
             }
             status.setTransferState(TransferState.DOWNLOAD);
             detail.setTransferStatus(status);
-            registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+            registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
 
             app.setStandardError(localStdErrFile.getAbsolutePath());
             app.setStandardOutput(localStdOutFile.getAbsolutePath());
@@ -165,14 +199,14 @@ public class SCPOutputHandler extends AbstractHandler{
         } catch (IOException e) {
             throw new GFacHandlerException(e.getMessage(), e);
         } catch (Exception e) {
-        	 try {
-         	    status.setTransferState(TransferState.FAILED);
- 				detail.setTransferStatus(status);
- 				registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
- 				GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE,  jobExecutionContext.getTaskData().getTaskID());
-  			} catch (Exception e1) {
-  			    throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
-  		   }
+            try {
+                status.setTransferState(TransferState.FAILED);
+                detail.setTransferStatus(status);
+                registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+                GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE, jobExecutionContext.getTaskData().getTaskID());
+            } catch (Exception e1) {
+                throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+            }
             throw new GFacHandlerException("Error in retrieving results", e);
         }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
index 3341d9b..53b9206 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
@@ -58,7 +58,7 @@ public class EmbeddedGFACJobSubmitter implements JobSubmitter {
     }
 
 
-    public String submit(String experimentID, String taskID) throws OrchestratorException {
+    public JobExecutionContext submit(String experimentID, String taskID) throws OrchestratorException {
         JobExecutionContext jobExecutionContext;
         try {
              jobExecutionContext = gfac.submitJob(experimentID, taskID);
@@ -67,7 +67,7 @@ public class EmbeddedGFACJobSubmitter implements JobSubmitter {
             logger.error(error);
             throw new OrchestratorException(error);
         }
-        return jobExecutionContext.getJobDetails().getJobID();
+        return jobExecutionContext;
     }
 
     public GFac getGfac() {
@@ -86,9 +86,9 @@ public class EmbeddedGFACJobSubmitter implements JobSubmitter {
         this.orchestratorContext = orchestratorContext;
     }
 
-    public void runAfterJobTask(String experimentID, String taskID) throws OrchestratorException {
+    public void runAfterJobTask(JobExecutionContext jobExecutionContext) throws OrchestratorException {
         try {
-            gfac.invokeOutFlowHandlers(experimentID,taskID);
+            gfac.invokeOutFlowHandlers(jobExecutionContext);
         } catch (GFacException e) {
             throw new OrchestratorException(e);
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
index 1c3c934..cf8642f 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
@@ -22,6 +22,7 @@ package org.apache.airavata.orchestrator.core.job;
 
 import java.util.List;
 
+import org.apache.airavata.gfac.context.JobExecutionContext;
 import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
 import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
 import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
@@ -50,15 +51,14 @@ public interface JobSubmitter {
      * just get the request data and do the submission
      * @param experimentID experimentID cannot be null
      * @param taskID taskID cannot be null
-     * @return jobID return the jobID from GFac
+     * @return JobExecutionContext return the jobExecutionContext from GFac
      */
-    String submit(String experimentID, String taskID) throws OrchestratorException;
+    JobExecutionContext submit(String experimentID, String taskID) throws OrchestratorException;
 
     /**
      * This can be use to handle any after Jobsubmission task
-     * @param experimentID
-     * @param taskID
+     * @param jobExecutionContext
      * @throws OrchestratorException
      */
-    void runAfterJobTask(String experimentID,String taskID) throws OrchestratorException;
+    void runAfterJobTask(JobExecutionContext jobExecutionContext) throws OrchestratorException;
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index fa3b5f3..ca55169 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -29,6 +29,7 @@ import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gfac.context.JobExecutionContext;
 import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
 import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
 import org.apache.airavata.gfac.monitor.AbstractActivityListener;
@@ -121,7 +122,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator implements Abst
                     Constants.PUSH.equals(((GsisshHostType) hostDescription).getMonitorMode())) {
                 MonitorID monitorID = new MonitorID(hostDescription, null, taskId, workflowNodeId, experimentId, userName);
                 monitorManager.addAJobToMonitor(monitorID);
-                jobSubmitter.submit(experimentId, taskId);
+                JobExecutionContext jobExecutionContext = jobSubmitter.submit(experimentId, taskId);
                 if ("none".equals(jobID)) {
                     logger.error("Job submission Failed, so we remove the job from monitoring");
 
@@ -132,9 +133,10 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator implements Abst
                 // Launching job for each task
                 // if the monitoring is pull mode then we add the monitorID for each task after submitting
                 // the job with the jobID, otherwise we don't need the jobID
-                jobSubmitter.submit(experimentId, taskId);
+                JobExecutionContext jobExecutionContext = jobSubmitter.submit(experimentId, taskId);
                 logger.info("Job Launched to the resource by GFAC and jobID returned : " + jobID);
                 MonitorID monitorID = new MonitorID(hostDescription, jobID, taskId, workflowNodeId, experimentId, userName, authenticationInfo);
+                monitorID.setJobExecutionContext(jobExecutionContext);
                 if ("none".equals(jobID)) {
                     logger.error("Job submission Failed, so we remove the job from monitoring");
 
@@ -187,7 +189,16 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator implements Abst
     public void handlePostExperimentTask(JobStatusChangeRequest status) throws OrchestratorException {
         if(status.getState() == JobState.COMPLETE){
             MonitorID monitorID = status.getMonitorID();
-            jobSubmitter.runAfterJobTask(monitorID.getExperimentID(), monitorID.getTaskID());
+            if(monitorID.getJobExecutionContext() == null){
+                // this code is to handle amqp scenario where monitorID doesn't have
+                // job execution context, in this case it will be created by the outputhandler
+                String experimentID = monitorID.getExperimentID();
+                String taskID = monitorID.getTaskID();
+                JobExecutionContext jobExecutionContext = new JobExecutionContext(null, null);
+                jobExecutionContext.setExperimentID(experimentID);
+                jobExecutionContext.setTaskData(new TaskDetails(taskID));
+            }
+            jobSubmitter.runAfterJobTask(monitorID.getJobExecutionContext());
         }
     }
     public ExecutorService getExecutor() {

http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
deleted file mode 100644
index ef03fbc..0000000
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor;
-
-import java.sql.Timestamp;
-import java.util.Date;
-import java.util.Map;
-
-import org.apache.airavata.commons.gfac.type.HostDescription;
-import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
-import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/*
-This is the object which contains the data to identify a particular
-Job to start the monitoring
-*/
-public class MonitorID {
-    private final static Logger logger = LoggerFactory.getLogger(MonitorID.class);
-
-    private String userName;
-
-    private Timestamp jobStartedTime;
-
-    private Timestamp lastMonitored;
-
-    private HostDescription host;
-
-    private AuthenticationInfo authenticationInfo = null;
-
-    private Map<String, Object> parameters;
-
-    private String experimentID;
-
-    private String workflowNodeID;
-
-    private String taskID;
-
-    private String jobID;
-
-    private int failedCount = 0;
-
-    private JobState state;
-
-    public MonitorID(HostDescription host, String jobID,String taskID, String workflowNodeID, String experimentID, String userName) {
-        this(host, jobID, taskID, workflowNodeID, experimentID, userName, null);
-    }
-
-    public MonitorID(HostDescription host, String jobID,String taskID, String workflowNodeID, String experimentID, String userName,AuthenticationInfo authenticationInfo) {
-        this.host = host;
-        this.jobStartedTime = new Timestamp((new Date()).getTime());
-        this.authenticationInfo = authenticationInfo;
-        this.userName = userName;
-        // if we give myproxyauthenticationInfo, so we try to use myproxy user as the user
-        if(this.authenticationInfo != null){
-            if(this.authenticationInfo instanceof MyProxyAuthenticationInfo){
-                this.userName = ((MyProxyAuthenticationInfo)this.authenticationInfo).getUserName();
-            }
-        }
-        this.jobID = jobID;
-        this.taskID = taskID;
-        this.workflowNodeID=workflowNodeID;
-        this.experimentID = experimentID;
-    }
-    public HostDescription getHost() {
-        return host;
-    }
-
-    public void setHost(HostDescription host) {
-        this.host = host;
-    }
-
-    public Timestamp getLastMonitored() {
-        return lastMonitored;
-    }
-
-    public void setLastMonitored(Timestamp lastMonitored) {
-        this.lastMonitored = lastMonitored;
-    }
-
-    public String getUserName() {
-        return userName;
-    }
-
-    public void setUserName(String userName) {
-        this.userName = userName;
-    }
-
-    public String getJobID() {
-        return jobID;
-    }
-
-    public void setJobID(String jobID) {
-        this.jobID = jobID;
-    }
-
-    public Timestamp getJobStartedTime() {
-        return jobStartedTime;
-    }
-
-    public void setJobStartedTime(Timestamp jobStartedTime) {
-        this.jobStartedTime = jobStartedTime;
-    }
-
-    public AuthenticationInfo getAuthenticationInfo() {
-        return authenticationInfo;
-    }
-
-    public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
-        this.authenticationInfo = authenticationInfo;
-    }
-
-    public void addParameter(String key,Object value) {
-        this.parameters.put(key, value);
-    }
-
-    public Object getParameter(String key) {
-        return this.parameters.get(key);
-    }
-
-    public Map<String, Object> getParameters() {
-        return parameters;
-    }
-
-    public void setParameters(Map<String, Object> parameters) {
-        this.parameters = parameters;
-    }
-
-    public String getExperimentID() {
-        return experimentID;
-    }
-
-    public void setExperimentID(String experimentID) {
-        this.experimentID = experimentID;
-    }
-
-    public String getTaskID() {
-        return taskID;
-    }
-
-    public void setTaskID(String taskID) {
-        this.taskID = taskID;
-    }
-
-    public int getFailedCount() {
-        return failedCount;
-    }
-
-    public void setFailedCount(int failedCount) {
-        this.failedCount = failedCount;
-    }
-
-    public JobState getStatus() {
-        return state;
-    }
-
-    public void setStatus(JobState status) {
-        // this logic is going to be useful for fast finishing jobs
-        // because in some machines job state vanishes quicckly when the job is done
-        // during that case job state comes as unknown.so we handle it here.
-            if (this.state != null && status.equals(JobState.UNKNOWN)) {
-                if (getFailedCount() > 2) {
-                    switch (this.state) {
-                        case ACTIVE:
-                            this.state = JobState.COMPLETE;
-                            break;
-                        case QUEUED:
-                            this.state = JobState.COMPLETE;
-                            break;
-                    }
-                } else {
-                    try {
-                        // when state becomes unknown we sleep for a while
-                        Thread.sleep(10000);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                    }
-                    setFailedCount(getFailedCount() + 1);
-                }
-            } else {
-                // normal scenario
-                this.state = status;
-            }
-    }
-
-	public String getWorkflowNodeID() {
-		return workflowNodeID;
-	}
-
-	public void setWorkflowNodeID(String workflowNodeID) {
-		this.workflowNodeID = workflowNodeID;
-	}
-
-//	public String getWorkflowNodeID() {
-//		return workflowNodeID;
-//	}
-//
-//	public void setWorkflowNodeID(String workflowNodeID) {
-//		this.workflowNodeID = workflowNodeID;
-//	}
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/tools/pom.xml
----------------------------------------------------------------------
diff --git a/tools/pom.xml b/tools/pom.xml
index 0bdcea2..8f188ad 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -30,7 +30,6 @@
                 <activeByDefault>true</activeByDefault>
             </activation>
             <modules>
-                <module>phoebus-integration</module>
 		<module>registry-tool</module>
 		<module>gsissh</module>
         </modules>