You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/11/04 20:50:42 UTC
airavata git commit: Moved job model to taskContext from
Processcontext
Repository: airavata
Updated Branches:
refs/heads/master be08320dd -> 98f077ed5
Moved job model to taskContext from Processcontext
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/98f077ed
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/98f077ed
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/98f077ed
Branch: refs/heads/master
Commit: 98f077ed5a9f007b834b6efd98b84f5aea750d4d
Parents: be08320
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Wed Nov 4 14:50:35 2015 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Wed Nov 4 14:50:35 2015 -0500
----------------------------------------------------------------------
.../gfac/core/context/ProcessContext.java | 15 ------------
.../airavata/gfac/core/context/TaskContext.java | 24 +++++++++++++++++++-
.../airavata/gfac/impl/GFacEngineImpl.java | 24 +++++++-------------
.../impl/task/SSHForkJobSubmissionTask.java | 12 +++++-----
.../gfac/impl/task/SSHJobSubmissionTask.java | 10 ++++----
.../gfac/monitor/email/EmailBasedMonitor.java | 2 +-
6 files changed, 42 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/98f077ed/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index 1a3a236..3a1a8d1 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -75,7 +75,6 @@ public class ProcessContext {
private String stderrLocation;
private JobSubmissionProtocol jobSubmissionProtocol;
private DataMovementProtocol dataMovementProtocol;
- private JobModel jobModel;
private ComputeResourcePreference computeResourcePreference;
private MonitorMode monitorMode;
private ResourceJobManager resourceJobManager;
@@ -317,20 +316,6 @@ public class ProcessContext {
return taskMap;
}
- public JobModel getJobModel() {
- if (jobModel == null) {
- jobModel = new JobModel();
- jobModel.setProcessId(processId);
- jobModel.setWorkingDir(getWorkingDir());
- jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
- }
- return jobModel;
- }
-
- public void setJobModel(JobModel jobModel) {
- this.jobModel = jobModel;
- }
-
public ComputeResourcePreference getComputeResourcePreference() {
return computeResourcePreference;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/98f077ed/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
index ae92ba1..156ada0 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
@@ -20,9 +20,12 @@
*/
package org.apache.airavata.gfac.core.context;
+import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
import org.apache.airavata.model.application.io.InputDataObjectType;
import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.status.TaskState;
import org.apache.airavata.model.status.TaskStatus;
import org.apache.airavata.model.task.TaskModel;
@@ -39,8 +42,9 @@ public class TaskContext {
private InputDataObjectType processInput;
private OutputDataObjectType processOutput;
private Object subTaskModel = null;
+ private JobModel jobModel;
- public TaskModel getTaskModel() {
+ public TaskModel getTaskModel() {
return taskModel;
}
@@ -117,4 +121,22 @@ public class TaskContext {
}
return subTaskModel;
}
+
+ public RemoteCluster getRemoteCluster() {
+ return getParentProcessContext().getRemoteCluster();
+ }
+
+ public JobModel getJobModel() {
+ if (jobModel == null) {
+ jobModel = new JobModel();
+ jobModel.setProcessId(getProcessId());
+ jobModel.setWorkingDir(getWorkingDir());
+ jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+ }
+ return jobModel;
+ }
+
+ public void setJobModel(JobModel jobModel) {
+ this.jobModel = jobModel;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/98f077ed/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 4b67ffd..d160a5a 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -136,13 +136,6 @@ public class GFacEngineImpl implements GFacEngine {
processContext.getProcessId());
}
- List<Object> jobModels = expCatalog.get(ExperimentCatalogModelType.JOB, "processId", processId);
- if (jobModels != null && !jobModels.isEmpty()) {
- if (jobModels.size() > 1) {
- log.warn("Process has more than one job model, take first one");
- }
- processContext.setJobModel(((JobModel) jobModels.get(0)));
- }
return processContext;
} catch (AppCatalogException e) {
throw new GFacException("App catalog access exception ", e);
@@ -295,10 +288,10 @@ public class GFacEngineImpl implements GFacEngine {
MonitorTaskModel monitorTaskModel = ((MonitorTaskModel) taskContext.getSubTaskModel());
monitorService = Factory.getMonitorService(monitorTaskModel.getMonitorMode());
- if (!monitorService.isMonitoring(processContext.getJobModel().getJobId())) {
- monitorService.monitor(processContext.getJobModel().getJobId(), taskContext);
+ if (!monitorService.isMonitoring(taskContext.getJobModel().getJobId())) {
+ monitorService.monitor(taskContext.getJobModel().getJobId(), taskContext);
} else {
- log.warn("Jobid: {}, already in monitoring map", processContext.getJobModel().getJobId());
+ log.warn("Jobid: {}, already in monitoring map", taskContext.getJobModel().getJobId());
}
} catch (AiravataException | TException e) {
taskStatus = new TaskStatus(TaskState.FAILED);
@@ -307,8 +300,8 @@ public class GFacEngineImpl implements GFacEngine {
taskContext.setTaskStatus(taskStatus);
GFacUtils.saveAndPublishTaskStatus(taskContext);
- String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(", processId: ")
- .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
+ String errorMsg = new StringBuilder("expId: ").append(taskContext.getExperimentId()).append(", processId: ")
+ .append(taskContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
.append(", type: ").append(taskContext.getTaskType().name()).append(" :- Input staging failed. Reason: ")
.append(taskStatus.getReason()).toString();
ErrorModel errorModel = new ErrorModel();
@@ -557,13 +550,12 @@ public class GFacEngineImpl implements GFacEngine {
if (oldJobStatus != null && oldJobStatus.getJobState() == JobState.QUEUED) {
JobMonitor monitorService = Factory.getMonitorService(taskContext.getParentProcessContext().getMonitorMode());
- monitorService.stopMonitor(taskContext.getParentProcessContext().getJobModel().getJobId(), true);
+ monitorService.stopMonitor(taskContext.getJobModel().getJobId(), true);
JobStatus newJobStatus = new JobStatus(JobState.CANCELED);
newJobStatus.setReason("Job cancelled");
newJobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- taskContext.getParentProcessContext().getJobModel().setJobStatus(newJobStatus);
- GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), taskContext.getParentProcessContext()
- .getJobModel());
+ taskContext.getJobModel().setJobStatus(newJobStatus);
+ GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), taskContext.getJobModel());
}
} catch (TaskException e) {
throw new GFacException("Error while cancelling job");
http://git-wip-us.apache.org/repos/asf/airavata/blob/98f077ed/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java
index 3a14b2e..cde3032 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java
@@ -60,9 +60,9 @@ public class SSHForkJobSubmissionTask implements JobSubmissionTask {
TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
try {
ProcessContext processContext = taskContext.getParentProcessContext();
- JobModel jobModel = processContext.getJobModel();
+ JobModel jobModel = taskContext.getJobModel();
jobModel.setTaskId(taskContext.getTaskId());
- RemoteCluster remoteCluster = processContext.getRemoteCluster();
+ RemoteCluster remoteCluster = taskContext.getRemoteCluster();
JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext, taskContext);
jobModel.setJobName(jobDescriptor.getJobName());
ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
@@ -75,7 +75,7 @@ public class SSHForkJobSubmissionTask implements JobSubmissionTask {
if (jobFile != null && jobFile.exists()) {
jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
JobSubmissionOutput jobSubmissionOutput = remoteCluster.submitBatchJob(jobFile.getPath(),
- processContext.getWorkingDir());
+ taskContext.getWorkingDir());
jobModel.setExitCode(jobSubmissionOutput.getExitCode());
jobModel.setStdErr(jobSubmissionOutput.getStdErr());
jobModel.setStdOut(jobSubmissionOutput.getStdOut());
@@ -84,8 +84,8 @@ public class SSHForkJobSubmissionTask implements JobSubmissionTask {
jobModel.setJobId(jobId);
GFacUtils.saveJobModel(processContext, jobModel);
jobStatus.setJobState(JobState.SUBMITTED);
- jobStatus.setReason("Successfully Submitted to " + taskContext.getParentProcessContext()
- .getComputeResourceDescription().getHostName());
+ jobStatus.setReason("Successfully Submitted to "
+ + processContext.getComputeResourceDescription().getHostName());
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
jobModel.setJobStatus(jobStatus);
GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
@@ -93,7 +93,7 @@ public class SSHForkJobSubmissionTask implements JobSubmissionTask {
taskStatus.setReason("Submitted job to compute resource");
}
if (jobId == null || jobId.isEmpty()) {
- String msg = "expId:" + processContext.getProcessModel().getExperimentId() + " Couldn't find " +
+ String msg = "expId:" + taskContext.getExperimentId() + " Couldn't find " +
"remote jobId for JobName:" + jobModel.getJobName() + ", both submit and verify steps " +
"doesn't return a valid JobId. " + "Hence changing experiment state to Failed";
log.error(msg);
http://git-wip-us.apache.org/repos/asf/airavata/blob/98f077ed/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
index 4cc041c..ee02312 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
@@ -60,7 +60,7 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
TaskStatus taskStatus = new TaskStatus(TaskState.COMPLETED); // set to completed.
try {
ProcessContext processContext = taskContext.getParentProcessContext();
- JobModel jobModel = processContext.getJobModel();
+ JobModel jobModel = taskContext.getJobModel();
jobModel.setTaskId(taskContext.getTaskId());
RemoteCluster remoteCluster = processContext.getRemoteCluster();
JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext,taskContext);
@@ -239,8 +239,7 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
@Override
public TaskStatus recover(TaskContext taskContext) {
- ProcessContext processContext = taskContext.getParentProcessContext();
- JobModel jobModel = processContext.getJobModel();
+ JobModel jobModel = taskContext.getJobModel();
// original job failed before submitting
if (jobModel == null || jobModel.getJobId() == null ){
return execute(taskContext);
@@ -257,9 +256,8 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
@Override
public JobStatus cancel(TaskContext taskcontext) throws TaskException {
- ProcessContext processContext = taskcontext.getParentProcessContext();
- RemoteCluster remoteCluster = processContext.getRemoteCluster();
- JobModel jobModel = processContext.getJobModel();
+ RemoteCluster remoteCluster = taskcontext.getRemoteCluster();
+ JobModel jobModel = taskcontext.getJobModel();
int retryCount = 0;
if (jobModel != null) {
try {
http://git-wip-us.apache.org/repos/asf/airavata/blob/98f077ed/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index f983d63..ed4c9ac 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -302,7 +302,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
// TODO : update job state on process context
boolean runOutflowTasks = false;
JobStatus jobStatus = new JobStatus();
- JobModel jobModel = taskContext.getParentProcessContext().getJobModel();
+ JobModel jobModel = taskContext.getJobModel();
String jobDetails = "JobName : " + jobStatusResult.getJobName() + ", JobId : " + jobStatusResult.getJobId();
// TODO - Handle all other valid JobStates
if (resultState == JobState.COMPLETE) {