You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by go...@apache.org on 2017/05/02 16:12:24 UTC
airavata git commit: Add implementation for LocalJobSubmission
Repository: airavata
Updated Branches:
refs/heads/feature-workload-mgmt 702aa50ae -> 9a3617604
Add implementation for LocalJobSubmission
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/9a361760
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/9a361760
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/9a361760
Branch: refs/heads/feature-workload-mgmt
Commit: 9a361760406c1e3c2bf5862d77ad6a0cff139fb3
Parents: 702aa50
Author: Gourav Shenoy <go...@apache.org>
Authored: Tue May 2 12:12:22 2017 -0400
Committer: Gourav Shenoy <go...@apache.org>
Committed: Tue May 2 12:12:22 2017 -0400
----------------------------------------------------------------------
.../impl/LocalJobSubmissionTask.java | 38 +++++++++++---------
1 file changed, 22 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/9a361760/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/LocalJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/LocalJobSubmissionTask.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/LocalJobSubmissionTask.java
index 8bd73a4..804fce2 100644
--- a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/LocalJobSubmissionTask.java
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/LocalJobSubmissionTask.java
@@ -23,13 +23,6 @@ package org.apache.airavata.worker.task.jobsubmission.impl;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.gfac.core.cluster.JobSubmissionOutput;
-import org.apache.airavata.gfac.core.cluster.RemoteCluster;
-import org.apache.airavata.gfac.core.context.ProcessContext;
-import org.apache.airavata.gfac.core.context.TaskContext;
-import org.apache.airavata.gfac.core.task.JobSubmissionTask;
-import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.Factory;
import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths;
import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
@@ -42,6 +35,19 @@ import org.apache.airavata.model.status.TaskState;
import org.apache.airavata.model.status.TaskStatus;
import org.apache.airavata.model.task.TaskTypes;
import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.worker.core.cluster.JobSubmissionOutput;
+import org.apache.airavata.worker.core.cluster.RemoteCluster;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.apache.airavata.worker.core.context.TaskContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.apache.airavata.worker.core.task.TaskException;
+import org.apache.airavata.worker.core.utils.JobManagerConfiguration;
+import org.apache.airavata.worker.core.utils.WorkerUtils;
+import org.apache.airavata.worker.task.jobsubmission.JobSubmissionTask;
+import org.apache.airavata.worker.task.jobsubmission.utils.GroovyMap;
+import org.apache.airavata.worker.task.jobsubmission.utils.JobSubmissionFactory;
+import org.apache.airavata.worker.task.jobsubmission.utils.JobSubmissionUtils;
+import org.apache.airavata.worker.task.jobsubmission.utils.Script;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +56,7 @@ import java.io.File;
import java.io.IOException;
import java.util.*;
-public class LocalJobSubmissionTask implements JobSubmissionTask{
+public class LocalJobSubmissionTask implements JobSubmissionTask {
private static final Logger log = LoggerFactory.getLogger(LocalJobSubmissionTask.class);
private ProcessBuilder builder;
@@ -67,25 +73,25 @@ public class LocalJobSubmissionTask implements JobSubmissionTask{
jobModel.setTaskId(taskContext.getTaskId());
RemoteCluster remoteCluster = processContext.getJobSubmissionRemoteCluster();
- GroovyMap groovyMap = GFacUtils.createGroovyMap(processContext,taskContext);
+ GroovyMap groovyMap = JobSubmissionUtils.createGroovyMap(processContext,taskContext);
String jobId = AiravataUtils.getId("JOB_ID_");
jobModel.setJobName(groovyMap.get(Script.JOB_NAME).toString());
jobModel.setJobId(jobId);
- ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
+ ResourceJobManager resourceJobManager = JobSubmissionUtils.getResourceJobManager(processContext);
JobManagerConfiguration jConfig = null;
if (resourceJobManager != null) {
- jConfig = Factory.getJobManagerConfiguration(resourceJobManager);
+ jConfig = JobSubmissionFactory.getJobManagerConfiguration(resourceJobManager);
}
JobStatus jobStatus = new JobStatus();
- File jobFile = GFacUtils.createJobFile(groovyMap, taskContext, jConfig);
+ File jobFile = JobSubmissionUtils.createJobFile(groovyMap, taskContext, jConfig);
if (jobFile != null && jobFile.exists()) {
jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
- GFacUtils.saveJobModel(processContext, jobModel);
+ JobSubmissionUtils.saveJobModel(processContext, jobModel);
JobSubmissionOutput jobSubmissionOutput = remoteCluster.submitBatchJob(jobFile.getPath(),
processContext.getWorkingDir());
@@ -96,7 +102,7 @@ public class LocalJobSubmissionTask implements JobSubmissionTask{
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
jobModel.setJobStatuses(Arrays.asList(jobStatus));
//log job submit status
- GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+ WorkerUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
//for local, job gets completed synchronously
//so changing job status to complete
@@ -113,7 +119,7 @@ public class LocalJobSubmissionTask implements JobSubmissionTask{
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
jobModel.setJobStatuses(Arrays.asList(jobStatus));
//log job complete status
- GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+ WorkerUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
taskStatus = new TaskStatus(TaskState.COMPLETED);
@@ -128,7 +134,7 @@ public class LocalJobSubmissionTask implements JobSubmissionTask{
}
}
- } catch (GFacException | IOException | AppCatalogException | ApplicationSettingsException e) {
+ } catch (WorkerException | IOException | AppCatalogException | ApplicationSettingsException e) {
String msg = "Error occurred while submitting a local job";
log.error(msg, e);
taskStatus.setReason(msg);