You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/11/04 20:50:42 UTC

airavata git commit: Moved job model to taskContext from Processcontext

Repository: airavata
Updated Branches:
  refs/heads/master be08320dd -> 98f077ed5


Moved job model to taskContext from Processcontext


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/98f077ed
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/98f077ed
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/98f077ed

Branch: refs/heads/master
Commit: 98f077ed5a9f007b834b6efd98b84f5aea750d4d
Parents: be08320
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Wed Nov 4 14:50:35 2015 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Wed Nov 4 14:50:35 2015 -0500

----------------------------------------------------------------------
 .../gfac/core/context/ProcessContext.java       | 15 ------------
 .../airavata/gfac/core/context/TaskContext.java | 24 +++++++++++++++++++-
 .../airavata/gfac/impl/GFacEngineImpl.java      | 24 +++++++-------------
 .../impl/task/SSHForkJobSubmissionTask.java     | 12 +++++-----
 .../gfac/impl/task/SSHJobSubmissionTask.java    | 10 ++++----
 .../gfac/monitor/email/EmailBasedMonitor.java   |  2 +-
 6 files changed, 42 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/98f077ed/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index 1a3a236..3a1a8d1 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -75,7 +75,6 @@ public class ProcessContext {
 	private String stderrLocation;
 	private JobSubmissionProtocol jobSubmissionProtocol;
 	private DataMovementProtocol dataMovementProtocol;
-	private JobModel jobModel;
 	private ComputeResourcePreference computeResourcePreference;
 	private MonitorMode monitorMode;
 	private ResourceJobManager resourceJobManager;
@@ -317,20 +316,6 @@ public class ProcessContext {
         return taskMap;
     }
 
-	public JobModel getJobModel() {
-		if (jobModel == null) {
-			jobModel = new JobModel();
-			jobModel.setProcessId(processId);
-			jobModel.setWorkingDir(getWorkingDir());
-			jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
-		}
-		return jobModel;
-	}
-
-	public void setJobModel(JobModel jobModel) {
-		this.jobModel = jobModel;
-	}
-
 	public ComputeResourcePreference getComputeResourcePreference() {
 		return computeResourcePreference;
 	}

http://git-wip-us.apache.org/repos/asf/airavata/blob/98f077ed/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
index ae92ba1..156ada0 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
@@ -20,9 +20,12 @@
  */
 package org.apache.airavata.gfac.core.context;
 
+import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
 import org.apache.airavata.model.application.io.InputDataObjectType;
 import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.status.TaskState;
 import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.TaskModel;
@@ -39,8 +42,9 @@ public class TaskContext {
     private InputDataObjectType processInput;
     private OutputDataObjectType processOutput;
     private Object subTaskModel = null;
+    private JobModel jobModel;
 
-	public TaskModel getTaskModel() {
+    public TaskModel getTaskModel() {
 		return taskModel;
 	}
 
@@ -117,4 +121,22 @@ public class TaskContext {
         }
         return subTaskModel;
     }
+
+    public RemoteCluster getRemoteCluster() {
+        return getParentProcessContext().getRemoteCluster();
+    }
+
+    public JobModel getJobModel() {
+        if (jobModel == null) {
+            jobModel = new JobModel();
+            jobModel.setProcessId(getProcessId());
+            jobModel.setWorkingDir(getWorkingDir());
+            jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+        }
+        return jobModel;
+    }
+
+    public void setJobModel(JobModel jobModel) {
+        this.jobModel = jobModel;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/98f077ed/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 4b67ffd..d160a5a 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -136,13 +136,6 @@ public class GFacEngineImpl implements GFacEngine {
                         processContext.getProcessId());
             }
 
-            List<Object> jobModels = expCatalog.get(ExperimentCatalogModelType.JOB, "processId", processId);
-            if (jobModels != null && !jobModels.isEmpty()) {
-                if (jobModels.size() > 1) {
-                    log.warn("Process has more than one job model, take first one");
-                }
-                processContext.setJobModel(((JobModel) jobModels.get(0)));
-            }
             return processContext;
         } catch (AppCatalogException e) {
             throw new GFacException("App catalog access exception ", e);
@@ -295,10 +288,10 @@ public class GFacEngineImpl implements GFacEngine {
 
             MonitorTaskModel monitorTaskModel = ((MonitorTaskModel) taskContext.getSubTaskModel());
             monitorService = Factory.getMonitorService(monitorTaskModel.getMonitorMode());
-            if (!monitorService.isMonitoring(processContext.getJobModel().getJobId())) {
-                monitorService.monitor(processContext.getJobModel().getJobId(), taskContext);
+            if (!monitorService.isMonitoring(taskContext.getJobModel().getJobId())) {
+                monitorService.monitor(taskContext.getJobModel().getJobId(), taskContext);
             } else {
-                log.warn("Jobid: {}, already in monitoring map", processContext.getJobModel().getJobId());
+                log.warn("Jobid: {}, already in monitoring map", taskContext.getJobModel().getJobId());
             }
         } catch (AiravataException | TException e) {
             taskStatus = new TaskStatus(TaskState.FAILED);
@@ -307,8 +300,8 @@ public class GFacEngineImpl implements GFacEngine {
             taskContext.setTaskStatus(taskStatus);
             GFacUtils.saveAndPublishTaskStatus(taskContext);
 
-            String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(", processId: ")
-                    .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
+            String errorMsg = new StringBuilder("expId: ").append(taskContext.getExperimentId()).append(", processId: ")
+                    .append(taskContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
                     .append(", type: ").append(taskContext.getTaskType().name()).append(" :- Input staging failed. Reason: ")
                     .append(taskStatus.getReason()).toString();
             ErrorModel errorModel = new ErrorModel();
@@ -557,13 +550,12 @@ public class GFacEngineImpl implements GFacEngine {
 
             if (oldJobStatus != null && oldJobStatus.getJobState() == JobState.QUEUED) {
                 JobMonitor monitorService = Factory.getMonitorService(taskContext.getParentProcessContext().getMonitorMode());
-                monitorService.stopMonitor(taskContext.getParentProcessContext().getJobModel().getJobId(), true);
+                monitorService.stopMonitor(taskContext.getJobModel().getJobId(), true);
                 JobStatus newJobStatus = new JobStatus(JobState.CANCELED);
                 newJobStatus.setReason("Job cancelled");
                 newJobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-                taskContext.getParentProcessContext().getJobModel().setJobStatus(newJobStatus);
-                GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), taskContext.getParentProcessContext()
-                        .getJobModel());
+                taskContext.getJobModel().setJobStatus(newJobStatus);
+                GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), taskContext.getJobModel());
             }
         } catch (TaskException e) {
             throw new GFacException("Error while cancelling job");

http://git-wip-us.apache.org/repos/asf/airavata/blob/98f077ed/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java
index 3a14b2e..cde3032 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java
@@ -60,9 +60,9 @@ public class SSHForkJobSubmissionTask implements JobSubmissionTask {
         TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
         try {
             ProcessContext processContext = taskContext.getParentProcessContext();
-            JobModel jobModel = processContext.getJobModel();
+            JobModel jobModel = taskContext.getJobModel();
             jobModel.setTaskId(taskContext.getTaskId());
-            RemoteCluster remoteCluster = processContext.getRemoteCluster();
+            RemoteCluster remoteCluster = taskContext.getRemoteCluster();
             JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext, taskContext);
             jobModel.setJobName(jobDescriptor.getJobName());
             ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
@@ -75,7 +75,7 @@ public class SSHForkJobSubmissionTask implements JobSubmissionTask {
 	        if (jobFile != null && jobFile.exists()) {
                 jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
 	            JobSubmissionOutput jobSubmissionOutput = remoteCluster.submitBatchJob(jobFile.getPath(),
-			            processContext.getWorkingDir());
+			            taskContext.getWorkingDir());
 	            jobModel.setExitCode(jobSubmissionOutput.getExitCode());
 	            jobModel.setStdErr(jobSubmissionOutput.getStdErr());
 	            jobModel.setStdOut(jobSubmissionOutput.getStdOut());
@@ -84,8 +84,8 @@ public class SSHForkJobSubmissionTask implements JobSubmissionTask {
                     jobModel.setJobId(jobId);
                     GFacUtils.saveJobModel(processContext, jobModel);
                     jobStatus.setJobState(JobState.SUBMITTED);
-                    jobStatus.setReason("Successfully Submitted to " + taskContext.getParentProcessContext()
-                            .getComputeResourceDescription().getHostName());
+                    jobStatus.setReason("Successfully Submitted to "
+                            + processContext.getComputeResourceDescription().getHostName());
                     jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
                     jobModel.setJobStatus(jobStatus);
                     GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
@@ -93,7 +93,7 @@ public class SSHForkJobSubmissionTask implements JobSubmissionTask {
                     taskStatus.setReason("Submitted job to compute resource");
                 }
                 if (jobId == null || jobId.isEmpty()) {
-                    String msg = "expId:" + processContext.getProcessModel().getExperimentId() + " Couldn't find " +
+                    String msg = "expId:" + taskContext.getExperimentId() + " Couldn't find " +
                             "remote jobId for JobName:" + jobModel.getJobName() + ", both submit and verify steps " +
                             "doesn't return a valid JobId. " + "Hence changing experiment state to Failed";
                     log.error(msg);

http://git-wip-us.apache.org/repos/asf/airavata/blob/98f077ed/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
index 4cc041c..ee02312 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
@@ -60,7 +60,7 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
 	    TaskStatus taskStatus = new TaskStatus(TaskState.COMPLETED); // set to completed.
 	    try {
 		    ProcessContext processContext = taskContext.getParentProcessContext();
-		    JobModel jobModel = processContext.getJobModel();
+		    JobModel jobModel = taskContext.getJobModel();
 		    jobModel.setTaskId(taskContext.getTaskId());
 		    RemoteCluster remoteCluster = processContext.getRemoteCluster();
 		    JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext,taskContext);
@@ -239,8 +239,7 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
 
     @Override
     public TaskStatus recover(TaskContext taskContext) {
-            ProcessContext processContext = taskContext.getParentProcessContext();
-            JobModel jobModel = processContext.getJobModel();
+            JobModel jobModel = taskContext.getJobModel();
             // original job failed before submitting
             if (jobModel == null || jobModel.getJobId() == null ){
                 return execute(taskContext);
@@ -257,9 +256,8 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
 
 	@Override
 	public JobStatus cancel(TaskContext taskcontext) throws TaskException {
-		ProcessContext processContext = taskcontext.getParentProcessContext();
-		RemoteCluster remoteCluster = processContext.getRemoteCluster();
-		JobModel jobModel = processContext.getJobModel();
+		RemoteCluster remoteCluster = taskcontext.getRemoteCluster();
+		JobModel jobModel = taskcontext.getJobModel();
 		int retryCount = 0;
 		if (jobModel != null) {
 			try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/98f077ed/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index f983d63..ed4c9ac 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -302,7 +302,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
 	    // TODO : update job state on process context
         boolean runOutflowTasks = false;
 	    JobStatus jobStatus = new JobStatus();
-	    JobModel jobModel = taskContext.getParentProcessContext().getJobModel();
+	    JobModel jobModel = taskContext.getJobModel();
         String jobDetails = "JobName : " + jobStatusResult.getJobName() + ", JobId : " + jobStatusResult.getJobId();
         // TODO - Handle all other valid JobStates
         if (resultState == JobState.COMPLETE) {