You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/04/23 22:47:35 UTC
git commit: fixing monitoring issue with async submission
Repository: airavata
Updated Branches:
refs/heads/master b02308493 -> 34a8147e4
fixing monitoring issue with async submission
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/34a8147e
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/34a8147e
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/34a8147e
Branch: refs/heads/master
Commit: 34a8147e4a6980d6fbbc59f7a8440d6797da9e1a
Parents: b023084
Author: lahiru <la...@apache.org>
Authored: Wed Apr 23 16:47:21 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Wed Apr 23 16:47:21 2014 -0400
----------------------------------------------------------------------
modules/distribution/server/pom.xml | 5 +
.../server/src/main/assembly/bin-assembly.xml | 1 +
.../java/org/apache/airavata/gfac/cpi/GFac.java | 5 +-
.../org/apache/airavata/gfac/cpi/GFacImpl.java | 34 +--
.../apache/airavata/gfac/monitor/MonitorID.java | 13 +-
.../airavata/gfac/handler/SCPOutputHandler.java | 98 +++++---
.../core/impl/EmbeddedGFACJobSubmitter.java | 8 +-
.../orchestrator/core/job/JobSubmitter.java | 10 +-
.../cpi/impl/SimpleOrchestratorImpl.java | 17 +-
.../apache/airavata/job/monitor/MonitorID.java | 221 -------------------
tools/pom.xml | 1 -
11 files changed, 121 insertions(+), 292 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/distribution/server/pom.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/pom.xml b/modules/distribution/server/pom.xml
index 0ff7f4e..ad6d07c 100644
--- a/modules/distribution/server/pom.xml
+++ b/modules/distribution/server/pom.xml
@@ -286,6 +286,11 @@
</dependency>
<dependency>
<groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-gfac-ssh</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
<artifactId>airavata-gfac-core</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/distribution/server/src/main/assembly/bin-assembly.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/src/main/assembly/bin-assembly.xml b/modules/distribution/server/src/main/assembly/bin-assembly.xml
index 05f4475..4a76cd2 100644
--- a/modules/distribution/server/src/main/assembly/bin-assembly.xml
+++ b/modules/distribution/server/src/main/assembly/bin-assembly.xml
@@ -196,6 +196,7 @@
<include>org.apache.airavata:airavata-data-models:jar</include>
<include>org.apache.airavata:airavata-credential-store:jar</include>
<include>org.apache.airavata:airavata-gfac-core:jar</include>
+ <include>org.apache.airavata:airavata-gfac-ssh:jar</include>
<include>org.apache.airavata:airavata-client-api:jar</include>
<include>org.apache.airavata:airavata-message-monitor:jar</include>
<include>org.apache.airavata:airavata-workflow-model-core:jar</include>
http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFac.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFac.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFac.java
index d18df07..fc5fd19 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFac.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFac.java
@@ -41,10 +41,9 @@ public interface GFac {
/**
* This method has to be invoked after submitting the job and have to make sure job is properly finished
- * @param experimentID
- * @param taskID
+ * @param jobExecutionContext
* @throws GFacException
*/
- public void invokeOutFlowHandlers(String experimentID,String taskID) throws GFacException;
+ public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
index 14ea519..6312292 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
@@ -309,30 +309,20 @@ public class GFacImpl implements GFac, AbstractActivityListener {
}
}
- public void invokeOutFlowHandlers(String experimentID,String taskID) throws GFacException {
- JobExecutionContext jobExecutionContext = null;
- try {
- jobExecutionContext = createJEC(experimentID, taskID);
- Scheduler.schedule(jobExecutionContext);
- ApplicationDescription applicationDeploymentDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
- TaskDetails taskData = (TaskDetails) registry.get(DataType.TASK_DETAIL, taskID);
- JobDetails jobDetails = taskData.getJobDetailsList().get(0);
- String jobDescription = jobDetails.getJobDescription();
- if(jobDescription != null) {
- JobDescriptor jobDescriptor = JobDescriptor.fromXML(jobDescription);
- applicationDeploymentDescription.getType().setScratchWorkingDirectory(
- jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getWorkingDirectory());
- applicationDeploymentDescription.getType().setInputDataDirectory(jobDescriptor.getInputDirectory());
- applicationDeploymentDescription.getType().setOutputDataDirectory(jobDescriptor.getOutputDirectory());
- applicationDeploymentDescription.getType().setStandardError(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardErrorFile());
- applicationDeploymentDescription.getType().setStandardOutput(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardOutFile());
- }
+ public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+ GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
+ List<GFacHandlerConfig> handlers = null;
+ if(gFacConfiguration != null){
+ handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
+ }else {
+ try {
+ jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
} catch (Exception e) {
- throw new GFacException(e);
+ log.error("Error constructing job execution context during outhandler invocation");
+ throw new GFacException(e);
+ }
+ schedule(jobExecutionContext);
}
-
- List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
-
for (GFacHandlerConfig handlerClassName : handlers) {
Class<? extends GFacHandler> handlerClass;
GFacHandler handler;
http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java
index c022bef..a01dcba 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java
@@ -21,6 +21,7 @@
package org.apache.airavata.gfac.monitor;
import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
import org.apache.airavata.model.workspace.experiment.JobState;
@@ -62,6 +63,8 @@ public class MonitorID {
private JobState state;
+ private JobExecutionContext jobExecutionContext;
+
public MonitorID(HostDescription host, String jobID,String taskID, String workflowNodeID, String experimentID, String userName) {
this.host = host;
this.jobStartedTime = new Timestamp((new Date()).getTime());
@@ -215,7 +218,15 @@ public class MonitorID {
this.workflowNodeID = workflowNodeID;
}
-// public String getWorkflowNodeID() {
+ public JobExecutionContext getJobExecutionContext() {
+ return jobExecutionContext;
+ }
+
+ public void setJobExecutionContext(JobExecutionContext jobExecutionContext) {
+ this.jobExecutionContext = jobExecutionContext;
+ }
+
+ // public String getWorkflowNodeID() {
// return workflowNodeID;
// }
//
http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
index 0b6619e..789d188 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
@@ -33,7 +33,9 @@ import net.schmizz.sshj.transport.TransportException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.ApplicationDescription;
import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.Scheduler;
import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gfac.context.security.GSISecurityContext;
import org.apache.airavata.gfac.context.security.SSHSecurityContext;
@@ -42,10 +44,13 @@ import org.apache.airavata.gfac.util.GFACSSHUtils;
import org.apache.airavata.gfac.utils.GFacUtils;
import org.apache.airavata.gfac.utils.OutputUtils;
import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
import org.apache.airavata.gsi.ssh.util.SSHUtils;
import org.apache.airavata.model.workspace.experiment.*;
import org.apache.airavata.persistance.registry.jpa.model.DataTransferDetail;
import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.registry.cpi.DataType;
+import org.apache.airavata.registry.cpi.RegistryException;
import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
import org.apache.airavata.schemas.gfac.URIParameterType;
import org.apache.xmlbeans.XmlException;
@@ -56,7 +61,36 @@ public class SCPOutputHandler extends AbstractHandler{
private static final Logger log = LoggerFactory.getLogger(SCPOutputHandler.class);
public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException, GFacException {
- if(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null){
+ if(jobExecutionContext.getTaskData().getJobDetailsListSize() == 0) { // this is because we don't have the right jobexecution context
+ // so attempting to get it from the registry
+ log.warn("During the out handler chain jobExecution context came null, so trying to handler");
+ ApplicationDescription applicationDeploymentDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
+ TaskDetails taskData = null;
+ try {
+ taskData = (TaskDetails) registry.get(DataType.TASK_DETAIL, jobExecutionContext.getTaskData().getTaskID());
+ } catch (RegistryException e) {
+ log.error("Error retrieving job details from Registry");
+ throw new GFacHandlerException("Error retrieving job details from Registry", e);
+ }
+ JobDetails jobDetails = taskData.getJobDetailsList().get(0);
+ String jobDescription = jobDetails.getJobDescription();
+ if (jobDescription != null) {
+ JobDescriptor jobDescriptor = null;
+ try {
+ jobDescriptor = JobDescriptor.fromXML(jobDescription);
+ } catch (XmlException e1) {
+ e1.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ applicationDeploymentDescription.getType().setScratchWorkingDirectory(
+ jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getWorkingDirectory());
+ applicationDeploymentDescription.getType().setInputDataDirectory(jobDescriptor.getInputDirectory());
+ applicationDeploymentDescription.getType().setOutputDataDirectory(jobDescriptor.getOutputDirectory());
+ applicationDeploymentDescription.getType().setStandardError(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardErrorFile());
+ applicationDeploymentDescription.getType().setStandardOutput(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardOutFile());
+ }
+ }
+
+ if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) {
try {
GFACSSHUtils.addSecurityContext(jobExecutionContext);
} catch (ApplicationSettingsException e) {
@@ -64,11 +98,11 @@ public class SCPOutputHandler extends AbstractHandler{
throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
}
}
- super.invoke(jobExecutionContext);
+ super.invoke(jobExecutionContext);
DataTransferDetails detail = new DataTransferDetails();
TransferStatus status = new TransferStatus();
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
.getApplicationDeploymentDescription().getType();
try {
Cluster cluster = null;
@@ -94,10 +128,10 @@ public class SCPOutputHandler extends AbstractHandler{
if (taskData.getAdvancedOutputDataHandling() != null) {
outputDataDir = taskData.getAdvancedOutputDataHandling().getOutputDataDir();
}
- if(outputDataDir == null) {
+ if (outputDataDir == null) {
outputDataDir = File.separator + "tmp";
}
- outputDataDir = outputDataDir + File.separator + jobExecutionContext.getExperimentID() + "-" +jobExecutionContext.getTaskData().getTaskID();
+ outputDataDir = outputDataDir + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID();
(new File(outputDataDir)).mkdirs();
@@ -114,35 +148,35 @@ public class SCPOutputHandler extends AbstractHandler{
status.setTransferState(TransferState.COMPLETE);
detail.setTransferStatus(status);
detail.setTransferDescription("STDOUT:" + stdOutStr);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
status.setTransferState(TransferState.COMPLETE);
detail.setTransferStatus(status);
detail.setTransferDescription("STDERR:" + stdErrStr);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
Map<String, ActualParameter> stringMap = new HashMap<String, ActualParameter>();
Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
Set<String> keys = output.keySet();
for (String paramName : keys) {
- ActualParameter actualParameter = (ActualParameter) output.get(paramName);
- if ("URI".equals(actualParameter.getType().getType().toString())) {
-
- List<String> outputList = cluster.listDirectory(app.getOutputDataDirectory());
- if (outputList.size() == 0 || outputList.get(0).isEmpty()) {
- stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
+ ActualParameter actualParameter = (ActualParameter) output.get(paramName);
+ if ("URI".equals(actualParameter.getType().getType().toString())) {
+
+ List<String> outputList = cluster.listDirectory(app.getOutputDataDirectory());
+ if (outputList.size() == 0 || outputList.get(0).isEmpty()) {
+ stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
+ } else {
+ String valueList = outputList.get(0);
+ cluster.scpFrom(app.getOutputDataDirectory() + File.separator + valueList, outputDataDir);
+ jobExecutionContext.addOutputFile(outputDataDir + File.separator + valueList);
+ ((URIParameterType) actualParameter.getType()).setValue(valueList);
+ stringMap = new HashMap<String, ActualParameter>();
+ stringMap.put(paramName, actualParameter);
+ }
} else {
- String valueList = outputList.get(0);
- cluster.scpFrom(app.getOutputDataDirectory() + File.separator + valueList, outputDataDir);
- jobExecutionContext.addOutputFile(outputDataDir + File.separator + valueList);
- ((URIParameterType) actualParameter.getType()).setValue(valueList);
- stringMap = new HashMap<String, ActualParameter>();
- stringMap.put(paramName, actualParameter);
- }
- }else{
- stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
- }
+ stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
+ }
}
if (stringMap == null || stringMap.isEmpty()) {
throw new GFacHandlerException(
@@ -151,7 +185,7 @@ public class SCPOutputHandler extends AbstractHandler{
}
status.setTransferState(TransferState.DOWNLOAD);
detail.setTransferStatus(status);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
app.setStandardError(localStdErrFile.getAbsolutePath());
app.setStandardOutput(localStdOutFile.getAbsolutePath());
@@ -165,14 +199,14 @@ public class SCPOutputHandler extends AbstractHandler{
} catch (IOException e) {
throw new GFacHandlerException(e.getMessage(), e);
} catch (Exception e) {
- try {
- status.setTransferState(TransferState.FAILED);
- detail.setTransferStatus(status);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
- GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE, jobExecutionContext.getTaskData().getTaskID());
- } catch (Exception e1) {
- throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
- }
+ try {
+ status.setTransferState(TransferState.FAILED);
+ detail.setTransferStatus(status);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+ GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE, jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e1) {
+ throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+ }
throw new GFacHandlerException("Error in retrieving results", e);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
index 3341d9b..53b9206 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
@@ -58,7 +58,7 @@ public class EmbeddedGFACJobSubmitter implements JobSubmitter {
}
- public String submit(String experimentID, String taskID) throws OrchestratorException {
+ public JobExecutionContext submit(String experimentID, String taskID) throws OrchestratorException {
JobExecutionContext jobExecutionContext;
try {
jobExecutionContext = gfac.submitJob(experimentID, taskID);
@@ -67,7 +67,7 @@ public class EmbeddedGFACJobSubmitter implements JobSubmitter {
logger.error(error);
throw new OrchestratorException(error);
}
- return jobExecutionContext.getJobDetails().getJobID();
+ return jobExecutionContext;
}
public GFac getGfac() {
@@ -86,9 +86,9 @@ public class EmbeddedGFACJobSubmitter implements JobSubmitter {
this.orchestratorContext = orchestratorContext;
}
- public void runAfterJobTask(String experimentID, String taskID) throws OrchestratorException {
+ public void runAfterJobTask(JobExecutionContext jobExecutionContext) throws OrchestratorException {
try {
- gfac.invokeOutFlowHandlers(experimentID,taskID);
+ gfac.invokeOutFlowHandlers(jobExecutionContext);
} catch (GFacException e) {
throw new OrchestratorException(e);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
index 1c3c934..cf8642f 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
@@ -22,6 +22,7 @@ package org.apache.airavata.orchestrator.core.job;
import java.util.List;
+import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
@@ -50,15 +51,14 @@ public interface JobSubmitter {
* just get the request data and do the submission
* @param experimentID experimentID cannot be null
* @param taskID taskID cannot be null
- * @return jobID return the jobID from GFac
+ * @return JobExecutionContext return the jobExecutionContext from GFac
*/
- String submit(String experimentID, String taskID) throws OrchestratorException;
+ JobExecutionContext submit(String experimentID, String taskID) throws OrchestratorException;
/**
* This can be use to handle any after Jobsubmission task
- * @param experimentID
- * @param taskID
+ * @param jobExecutionContext
* @throws OrchestratorException
*/
- void runAfterJobTask(String experimentID,String taskID) throws OrchestratorException;
+ void runAfterJobTask(JobExecutionContext jobExecutionContext) throws OrchestratorException;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index fa3b5f3..ca55169 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -29,6 +29,7 @@ import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
import org.apache.airavata.gfac.monitor.AbstractActivityListener;
@@ -121,7 +122,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator implements Abst
Constants.PUSH.equals(((GsisshHostType) hostDescription).getMonitorMode())) {
MonitorID monitorID = new MonitorID(hostDescription, null, taskId, workflowNodeId, experimentId, userName);
monitorManager.addAJobToMonitor(monitorID);
- jobSubmitter.submit(experimentId, taskId);
+ JobExecutionContext jobExecutionContext = jobSubmitter.submit(experimentId, taskId);
if ("none".equals(jobID)) {
logger.error("Job submission Failed, so we remove the job from monitoring");
@@ -132,9 +133,10 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator implements Abst
// Launching job for each task
// if the monitoring is pull mode then we add the monitorID for each task after submitting
// the job with the jobID, otherwise we don't need the jobID
- jobSubmitter.submit(experimentId, taskId);
+ JobExecutionContext jobExecutionContext = jobSubmitter.submit(experimentId, taskId);
logger.info("Job Launched to the resource by GFAC and jobID returned : " + jobID);
MonitorID monitorID = new MonitorID(hostDescription, jobID, taskId, workflowNodeId, experimentId, userName, authenticationInfo);
+ monitorID.setJobExecutionContext(jobExecutionContext);
if ("none".equals(jobID)) {
logger.error("Job submission Failed, so we remove the job from monitoring");
@@ -187,7 +189,16 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator implements Abst
public void handlePostExperimentTask(JobStatusChangeRequest status) throws OrchestratorException {
if(status.getState() == JobState.COMPLETE){
MonitorID monitorID = status.getMonitorID();
- jobSubmitter.runAfterJobTask(monitorID.getExperimentID(), monitorID.getTaskID());
+ if(monitorID.getJobExecutionContext() == null){
+ // this code is to handle amqp scenario where monitorID doesn't have
+ // job execution context, in this case it will be created by the outputhandler
+ String experimentID = monitorID.getExperimentID();
+ String taskID = monitorID.getTaskID();
+ JobExecutionContext jobExecutionContext = new JobExecutionContext(null, null);
+ jobExecutionContext.setExperimentID(experimentID);
+ jobExecutionContext.setTaskData(new TaskDetails(taskID));
+ }
+ jobSubmitter.runAfterJobTask(monitorID.getJobExecutionContext());
}
}
public ExecutorService getExecutor() {
http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
deleted file mode 100644
index ef03fbc..0000000
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor;
-
-import java.sql.Timestamp;
-import java.util.Date;
-import java.util.Map;
-
-import org.apache.airavata.commons.gfac.type.HostDescription;
-import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
-import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/*
-This is the object which contains the data to identify a particular
-Job to start the monitoring
-*/
-public class MonitorID {
- private final static Logger logger = LoggerFactory.getLogger(MonitorID.class);
-
- private String userName;
-
- private Timestamp jobStartedTime;
-
- private Timestamp lastMonitored;
-
- private HostDescription host;
-
- private AuthenticationInfo authenticationInfo = null;
-
- private Map<String, Object> parameters;
-
- private String experimentID;
-
- private String workflowNodeID;
-
- private String taskID;
-
- private String jobID;
-
- private int failedCount = 0;
-
- private JobState state;
-
- public MonitorID(HostDescription host, String jobID,String taskID, String workflowNodeID, String experimentID, String userName) {
- this(host, jobID, taskID, workflowNodeID, experimentID, userName, null);
- }
-
- public MonitorID(HostDescription host, String jobID,String taskID, String workflowNodeID, String experimentID, String userName,AuthenticationInfo authenticationInfo) {
- this.host = host;
- this.jobStartedTime = new Timestamp((new Date()).getTime());
- this.authenticationInfo = authenticationInfo;
- this.userName = userName;
- // if we give myproxyauthenticationInfo, so we try to use myproxy user as the user
- if(this.authenticationInfo != null){
- if(this.authenticationInfo instanceof MyProxyAuthenticationInfo){
- this.userName = ((MyProxyAuthenticationInfo)this.authenticationInfo).getUserName();
- }
- }
- this.jobID = jobID;
- this.taskID = taskID;
- this.workflowNodeID=workflowNodeID;
- this.experimentID = experimentID;
- }
- public HostDescription getHost() {
- return host;
- }
-
- public void setHost(HostDescription host) {
- this.host = host;
- }
-
- public Timestamp getLastMonitored() {
- return lastMonitored;
- }
-
- public void setLastMonitored(Timestamp lastMonitored) {
- this.lastMonitored = lastMonitored;
- }
-
- public String getUserName() {
- return userName;
- }
-
- public void setUserName(String userName) {
- this.userName = userName;
- }
-
- public String getJobID() {
- return jobID;
- }
-
- public void setJobID(String jobID) {
- this.jobID = jobID;
- }
-
- public Timestamp getJobStartedTime() {
- return jobStartedTime;
- }
-
- public void setJobStartedTime(Timestamp jobStartedTime) {
- this.jobStartedTime = jobStartedTime;
- }
-
- public AuthenticationInfo getAuthenticationInfo() {
- return authenticationInfo;
- }
-
- public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
- this.authenticationInfo = authenticationInfo;
- }
-
- public void addParameter(String key,Object value) {
- this.parameters.put(key, value);
- }
-
- public Object getParameter(String key) {
- return this.parameters.get(key);
- }
-
- public Map<String, Object> getParameters() {
- return parameters;
- }
-
- public void setParameters(Map<String, Object> parameters) {
- this.parameters = parameters;
- }
-
- public String getExperimentID() {
- return experimentID;
- }
-
- public void setExperimentID(String experimentID) {
- this.experimentID = experimentID;
- }
-
- public String getTaskID() {
- return taskID;
- }
-
- public void setTaskID(String taskID) {
- this.taskID = taskID;
- }
-
- public int getFailedCount() {
- return failedCount;
- }
-
- public void setFailedCount(int failedCount) {
- this.failedCount = failedCount;
- }
-
- public JobState getStatus() {
- return state;
- }
-
- public void setStatus(JobState status) {
- // this logic is going to be useful for fast finishing jobs
- // because in some machines job state vanishes quicckly when the job is done
- // during that case job state comes as unknown.so we handle it here.
- if (this.state != null && status.equals(JobState.UNKNOWN)) {
- if (getFailedCount() > 2) {
- switch (this.state) {
- case ACTIVE:
- this.state = JobState.COMPLETE;
- break;
- case QUEUED:
- this.state = JobState.COMPLETE;
- break;
- }
- } else {
- try {
- // when state becomes unknown we sleep for a while
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- setFailedCount(getFailedCount() + 1);
- }
- } else {
- // normal scenario
- this.state = status;
- }
- }
-
- public String getWorkflowNodeID() {
- return workflowNodeID;
- }
-
- public void setWorkflowNodeID(String workflowNodeID) {
- this.workflowNodeID = workflowNodeID;
- }
-
-// public String getWorkflowNodeID() {
-// return workflowNodeID;
-// }
-//
-// public void setWorkflowNodeID(String workflowNodeID) {
-// this.workflowNodeID = workflowNodeID;
-// }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/tools/pom.xml
----------------------------------------------------------------------
diff --git a/tools/pom.xml b/tools/pom.xml
index 0bdcea2..8f188ad 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -30,7 +30,6 @@
<activeByDefault>true</activeByDefault>
</activation>
<modules>
- <module>phoebus-integration</module>
<module>registry-tool</module>
<module>gsissh</module>
</modules>