You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by go...@apache.org on 2017/05/02 16:12:24 UTC

airavata git commit: Add implementation for LocalJobSubmission

Repository: airavata
Updated Branches:
  refs/heads/feature-workload-mgmt 702aa50ae -> 9a3617604


Add implementation for LocalJobSubmission


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/9a361760
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/9a361760
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/9a361760

Branch: refs/heads/feature-workload-mgmt
Commit: 9a361760406c1e3c2bf5862d77ad6a0cff139fb3
Parents: 702aa50
Author: Gourav Shenoy <go...@apache.org>
Authored: Tue May 2 12:12:22 2017 -0400
Committer: Gourav Shenoy <go...@apache.org>
Committed: Tue May 2 12:12:22 2017 -0400

----------------------------------------------------------------------
 .../impl/LocalJobSubmissionTask.java            | 38 +++++++++++---------
 1 file changed, 22 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/9a361760/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/LocalJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/LocalJobSubmissionTask.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/LocalJobSubmissionTask.java
index 8bd73a4..804fce2 100644
--- a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/LocalJobSubmissionTask.java
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/LocalJobSubmissionTask.java
@@ -23,13 +23,6 @@ package org.apache.airavata.worker.task.jobsubmission.impl;
 
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.gfac.core.cluster.JobSubmissionOutput;
-import org.apache.airavata.gfac.core.cluster.RemoteCluster;
-import org.apache.airavata.gfac.core.context.ProcessContext;
-import org.apache.airavata.gfac.core.context.TaskContext;
-import org.apache.airavata.gfac.core.task.JobSubmissionTask;
-import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.Factory;
 import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
 import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths;
 import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
@@ -42,6 +35,19 @@ import org.apache.airavata.model.status.TaskState;
 import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.TaskTypes;
 import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.worker.core.cluster.JobSubmissionOutput;
+import org.apache.airavata.worker.core.cluster.RemoteCluster;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.apache.airavata.worker.core.context.TaskContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.apache.airavata.worker.core.task.TaskException;
+import org.apache.airavata.worker.core.utils.JobManagerConfiguration;
+import org.apache.airavata.worker.core.utils.WorkerUtils;
+import org.apache.airavata.worker.task.jobsubmission.JobSubmissionTask;
+import org.apache.airavata.worker.task.jobsubmission.utils.GroovyMap;
+import org.apache.airavata.worker.task.jobsubmission.utils.JobSubmissionFactory;
+import org.apache.airavata.worker.task.jobsubmission.utils.JobSubmissionUtils;
+import org.apache.airavata.worker.task.jobsubmission.utils.Script;
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,7 +56,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.*;
 
-public class LocalJobSubmissionTask implements JobSubmissionTask{
+public class LocalJobSubmissionTask implements JobSubmissionTask {
     private static final Logger log = LoggerFactory.getLogger(LocalJobSubmissionTask.class);
     private ProcessBuilder builder;
 
@@ -67,25 +73,25 @@ public class LocalJobSubmissionTask implements JobSubmissionTask{
             jobModel.setTaskId(taskContext.getTaskId());
 
             RemoteCluster remoteCluster = processContext.getJobSubmissionRemoteCluster();
-            GroovyMap groovyMap = GFacUtils.createGroovyMap(processContext,taskContext);
+            GroovyMap groovyMap = JobSubmissionUtils.createGroovyMap(processContext,taskContext);
 
             String jobId = AiravataUtils.getId("JOB_ID_");
             jobModel.setJobName(groovyMap.get(Script.JOB_NAME).toString());
             jobModel.setJobId(jobId);
 
-            ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
+            ResourceJobManager resourceJobManager = JobSubmissionUtils.getResourceJobManager(processContext);
             JobManagerConfiguration jConfig = null;
 
             if (resourceJobManager != null) {
-                jConfig = Factory.getJobManagerConfiguration(resourceJobManager);
+                jConfig = JobSubmissionFactory.getJobManagerConfiguration(resourceJobManager);
             }
 
             JobStatus jobStatus = new JobStatus();
-            File jobFile = GFacUtils.createJobFile(groovyMap, taskContext, jConfig);
+            File jobFile = JobSubmissionUtils.createJobFile(groovyMap, taskContext, jConfig);
             if (jobFile != null && jobFile.exists()) {
                 jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
 
-                GFacUtils.saveJobModel(processContext, jobModel);
+                JobSubmissionUtils.saveJobModel(processContext, jobModel);
 
                 JobSubmissionOutput jobSubmissionOutput = remoteCluster.submitBatchJob(jobFile.getPath(),
                         processContext.getWorkingDir());
@@ -96,7 +102,7 @@ public class LocalJobSubmissionTask implements JobSubmissionTask{
                 jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
                 jobModel.setJobStatuses(Arrays.asList(jobStatus));
                 //log job submit status
-                GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+                WorkerUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
 
                 //for local, job gets completed synchronously
                 //so changing job status to complete
@@ -113,7 +119,7 @@ public class LocalJobSubmissionTask implements JobSubmissionTask{
                 jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
                 jobModel.setJobStatuses(Arrays.asList(jobStatus));
                 //log job complete status
-                GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+                WorkerUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
 
 
                 taskStatus = new TaskStatus(TaskState.COMPLETED);
@@ -128,7 +134,7 @@ public class LocalJobSubmissionTask implements JobSubmissionTask{
                 }
             }
 
-        } catch (GFacException | IOException | AppCatalogException | ApplicationSettingsException e) {
+        } catch (WorkerException | IOException | AppCatalogException | ApplicationSettingsException e) {
             String msg = "Error occurred while submitting a local job";
             log.error(msg, e);
             taskStatus.setReason(msg);