You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/06/17 21:51:13 UTC
airavata git commit: procees context changes
Repository: airavata
Updated Branches:
refs/heads/master 367ddca6b -> 749ad9330
procees context changes
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/749ad933
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/749ad933
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/749ad933
Branch: refs/heads/master
Commit: 749ad9330369d0d0699331ce6e4c9a42df73ed21
Parents: 367ddca
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Wed Jun 17 15:51:07 2015 -0400
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Wed Jun 17 15:51:07 2015 -0400
----------------------------------------------------------------------
.../gfac/core/context/ProcessContext.java | 10 ++
.../gfac/impl/task/JobSubmissionTaskImpl.java | 108 ------------------
.../gfac/impl/task/SSHJobSubmissionTask.java | 110 +++++++++++++++++++
3 files changed, 120 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/749ad933/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index 4311b81..0ae69b8 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -30,6 +30,7 @@ import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescr
import org.apache.airavata.model.appcatalog.computeresource.DataMovementProtocol;
import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
+import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.process.ProcessModel;
import org.apache.airavata.registry.cpi.AppCatalog;
import org.apache.airavata.registry.cpi.ExperimentCatalog;
@@ -62,6 +63,7 @@ public class ProcessContext {
private String stderrLocation;
private JobSubmissionProtocol jobSubmissionProtocol;
private DataMovementProtocol dataMovementProtocol;
+ private JobModel jobModel;
public ProcessContext(String processId, String gatewayId, String tokenId) {
this.processId = processId;
@@ -234,4 +236,12 @@ public class ProcessContext {
public void setDataMovementProtocol(DataMovementProtocol dataMovementProtocol) {
this.dataMovementProtocol = dataMovementProtocol;
}
+
+ public JobModel getJobModel() {
+ return jobModel;
+ }
+
+ public void setJobModel(JobModel jobModel) {
+ this.jobModel = jobModel;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/749ad933/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/JobSubmissionTaskImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/JobSubmissionTaskImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/JobSubmissionTaskImpl.java
deleted file mode 100644
index 7d65f18..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/JobSubmissionTaskImpl.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-
-package org.apache.airavata.gfac.impl.task;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.common.utils.LocalEventPublisher;
-import org.apache.airavata.gfac.core.*;
-import org.apache.airavata.gfac.core.cluster.RemoteCluster;
-import org.apache.airavata.gfac.core.context.ProcessContext;
-import org.apache.airavata.gfac.core.context.TaskContext;
-import org.apache.airavata.gfac.core.task.JobSubmissionTask;
-import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.Factory;
-import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
-import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
-import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
-import org.apache.airavata.model.status.TaskState;
-import org.apache.airavata.registry.cpi.AppCatalog;
-import org.apache.airavata.registry.cpi.AppCatalogException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Map;
-
-public class JobSubmissionTaskImpl implements JobSubmissionTask {
- private static final Logger log = LoggerFactory.getLogger(JobSubmissionTaskImpl.class);
- @Override
- public void init(Map<String, String> propertyMap) throws TaskException {
-
- }
-
- @Override
- public TaskState execute(TaskContext taskContext) throws TaskException {
- try {
- ProcessContext processContext = taskContext.getParentProcessContext();
- RemoteCluster remoteCluster = processContext.getRemoteCluster();
- JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext);
- ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
- JobManagerConfiguration jConfig = null;
- if (resourceJobManager != null){
- String installedParentPath = resourceJobManager.getJobManagerBinPath();
- if (installedParentPath == null) {
- installedParentPath = "/";
- }
- ResourceJobManagerType resourceJobManagerType = resourceJobManager.getResourceJobManagerType();
- if (resourceJobManagerType == null) {
- log.error("No Job Manager is configured, so we are picking pbs as the default job manager");
- jConfig = Factory.getPBSJobManager(installedParentPath);
- } else {
- if (Constants.PBS_JOB_MANAGER.equalsIgnoreCase(resourceJobManagerType.toString())) {
- jConfig = Factory.getPBSJobManager(installedParentPath);
- } else if (Constants.SLURM_JOB_MANAGER.equalsIgnoreCase(resourceJobManagerType.toString())) {
- jConfig = Factory.getSLURMJobManager(installedParentPath);
- } else if (Constants.SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(resourceJobManagerType.toString())) {
- jConfig = Factory.getUGEJobManager(installedParentPath);
- } else if (Constants.LSF_JOB_MANAGER.equals(resourceJobManagerType.toString())) {
- jConfig = Factory.getLSFJobManager(installedParentPath);
- }
- }
- }
- File jobFile = GFacUtils.createJobFile(jobDescriptor, jConfig);
- if (jobFile != null && jobFile.exists()){
- String jobId = remoteCluster.submitBatchJob(jobFile.getPath(), processContext.getWorkingDir());
- }
-
- } catch (AppCatalogException e) {
- log.error("Error while instatiating app catalog",e);
- throw new TaskException("Error while instatiating app catalog", e);
- } catch (ApplicationSettingsException e) {
- log.error("Error occurred while creating job descriptor", e);
- throw new TaskException("Error occurred while creating job descriptor", e);
- } catch (GFacException e) {
- log.error("Error occurred while creating job descriptor", e);
- throw new TaskException("Error occurred while creating job descriptor", e);
- } catch (SSHApiException e) {
- log.error("Error occurred while submitting the job", e);
- throw new TaskException("Error occurred while submitting the job", e);
- }
- return null;
- }
-
- @Override
- public TaskState recover(TaskContext taskContext) throws TaskException {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/749ad933/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
new file mode 100644
index 0000000..f14502f
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.gfac.core.*;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.JobSubmissionTask;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Map;
+
+public class SSHJobSubmissionTask implements JobSubmissionTask {
+ private static final Logger log = LoggerFactory.getLogger(SSHJobSubmissionTask.class);
+ @Override
+ public void init(Map<String, String> propertyMap) throws TaskException {
+
+ }
+
+ @Override
+ public TaskState execute(TaskContext taskContext) throws TaskException {
+ try {
+ ProcessContext processContext = taskContext.getParentProcessContext();
+ JobModel jobModel = processContext.getJobModel();
+ if (jobModel == null){
+ jobModel = new JobModel();
+ }
+ RemoteCluster remoteCluster = processContext.getRemoteCluster();
+ JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext);
+ jobModel.setJobName(jobDescriptor.getJobName());
+ ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
+ JobManagerConfiguration jConfig = null;
+ if (resourceJobManager != null){
+ String installedParentPath = resourceJobManager.getJobManagerBinPath();
+ if (installedParentPath == null) {
+ installedParentPath = "/";
+ }
+ ResourceJobManagerType resourceJobManagerType = resourceJobManager.getResourceJobManagerType();
+ if (resourceJobManagerType == null) {
+ log.error("No Job Manager is configured, so we are picking pbs as the default job manager");
+ jConfig = Factory.getPBSJobManager(installedParentPath);
+ } else {
+ if (ResourceJobManagerType.PBS == resourceJobManagerType) {
+ jConfig = Factory.getPBSJobManager(installedParentPath);
+ } else if (ResourceJobManagerType.SLURM == resourceJobManagerType) {
+ jConfig = Factory.getSLURMJobManager(installedParentPath);
+ } else if (ResourceJobManagerType.UGE == resourceJobManagerType) {
+ jConfig = Factory.getUGEJobManager(installedParentPath);
+ } else if (ResourceJobManagerType.LSF == resourceJobManagerType) {
+ jConfig = Factory.getLSFJobManager(installedParentPath);
+ }
+ }
+ }
+ File jobFile = GFacUtils.createJobFile(jobDescriptor, jConfig);
+ if (jobFile != null && jobFile.exists()){
+ String jobId = remoteCluster.submitBatchJob(jobFile.getPath(), processContext.getWorkingDir());
+ }
+
+ } catch (AppCatalogException e) {
+ log.error("Error while instatiating app catalog",e);
+ throw new TaskException("Error while instatiating app catalog", e);
+ } catch (ApplicationSettingsException e) {
+ log.error("Error occurred while creating job descriptor", e);
+ throw new TaskException("Error occurred while creating job descriptor", e);
+ } catch (GFacException e) {
+ log.error("Error occurred while creating job descriptor", e);
+ throw new TaskException("Error occurred while creating job descriptor", e);
+ } catch (SSHApiException e) {
+ log.error("Error occurred while submitting the job", e);
+ throw new TaskException("Error occurred while submitting the job", e);
+ }
+ return null;
+ }
+
+ @Override
+ public TaskState recover(TaskContext taskContext) throws TaskException {
+ return null;
+ }
+}