You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2014/11/13 17:44:49 UTC
airavata git commit: fixing issues in ssh job submission
Repository: airavata
Updated Branches:
refs/heads/master d3d04cd17 -> afb1cde05
fixing issues in ssh job submission
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/afb1cde0
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/afb1cde0
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/afb1cde0
Branch: refs/heads/master
Commit: afb1cde05cfaef856123de6c7535b330a73b11c8
Parents: d3d04cd
Author: Chathuri Wimalasena <ka...@gmail.com>
Authored: Thu Nov 13 11:44:21 2014 -0500
Committer: Chathuri Wimalasena <ka...@gmail.com>
Committed: Thu Nov 13 11:44:21 2014 -0500
----------------------------------------------------------------------
.../gfac/core/context/JobExecutionContext.java | 17 ++++++++----
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 29 +++++++++++++++++---
.../airavata/gfac/monitor/HPCMonitorID.java | 21 ++++++++------
.../impl/pull/qstat/ResourceConnection.java | 18 ++++++------
.../gfac/ssh/provider/impl/SSHProvider.java | 14 ++--------
.../airavata/gfac/ssh/util/GFACSSHUtils.java | 28 +++++++++----------
6 files changed, 76 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/afb1cde0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
index 4dbde7e..ab15445 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
@@ -36,10 +36,7 @@ import org.apache.airavata.gfac.SecurityContext;
import org.apache.airavata.gfac.core.cpi.GFac;
import org.apache.airavata.gfac.core.notification.GFacNotifier;
import org.apache.airavata.gfac.core.provider.GFacProvider;
-import org.apache.airavata.model.appcatalog.computeresource.DataMovementInterface;
-import org.apache.airavata.model.appcatalog.computeresource.DataMovementProtocol;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.*;
import org.apache.airavata.model.workspace.experiment.Experiment;
import org.apache.airavata.model.workspace.experiment.JobDetails;
import org.apache.airavata.model.workspace.experiment.TaskDetails;
@@ -117,6 +114,8 @@ public class JobExecutionContext extends AbstractContext implements Serializable
* use preferred job submission protocol.
*/
private JobSubmissionInterface preferredJobSubmissionInterface;
+
+ private ResourceJobManager resourceJobManager;
/**
* List of job submission protocols sorted by priority order.
*/
@@ -300,7 +299,15 @@ public class JobExecutionContext extends AbstractContext implements Serializable
this.inPath = false;
}
- public SecurityContext getSecurityContext(String name) throws GFacException{
+ public ResourceJobManager getResourceJobManager() {
+ return resourceJobManager;
+ }
+
+ public void setResourceJobManager(ResourceJobManager resourceJobManager) {
+ this.resourceJobManager = resourceJobManager;
+ }
+
+ public SecurityContext getSecurityContext(String name) throws GFacException{
SecurityContext secContext = securityContext.get(name);
return secContext;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/afb1cde0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 666190e..b06dbab 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -21,6 +21,7 @@
package org.apache.airavata.gfac.core.cpi;
import org.airavata.appcatalog.cpi.AppCatalog;
+import org.airavata.appcatalog.cpi.AppCatalogException;
import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
@@ -53,10 +54,7 @@ import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentD
import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
-import org.apache.airavata.model.appcatalog.computeresource.DataMovementInterface;
-import org.apache.airavata.model.appcatalog.computeresource.FileSystems;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import org.apache.airavata.model.appcatalog.computeresource.*;
import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
import org.apache.airavata.model.messaging.event.*;
import org.apache.airavata.model.workspace.experiment.*;
@@ -338,6 +336,7 @@ public class BetterGfacImpl implements GFac,Watcher {
// set compute resource configuration as default preferred values, after that replace those with gateway user preferences.
populateDefaultComputeResourceConfiguration(jobExecutionContext, applicationInterface, computeResource);
+ populateResourceJobManager(jobExecutionContext);
// if gateway resource preference is set
if (gatewayResourcePreferences != null ) {
if (gatewayResourcePreferences.getScratchLocation() == null) {
@@ -397,7 +396,9 @@ public class BetterGfacImpl implements GFac,Watcher {
* Stdout and Stderr for Shell
*/
jobExecutionContext.setStandardOutput(workingDir + File.separator + applicationInterface.getApplicationName().replaceAll("\\s+", "") + ".stdout");
+ System.out.println("*********************** standared output ************* " + jobExecutionContext.getStandardOutput());
jobExecutionContext.setStandardError(workingDir + File.separator + applicationInterface.getApplicationName().replaceAll("\\s+", "") + ".stderr");
+ System.out.println("*********************** standared error ************* " + jobExecutionContext.getStandardError());
}
private void populateDefaultComputeResourceConfiguration(JobExecutionContext jobExecutionContext, ApplicationInterfaceDescription applicationInterface, ComputeResourceDescription computeResource) {
@@ -416,6 +417,26 @@ public class BetterGfacImpl implements GFac,Watcher {
}
}
+ private void populateResourceJobManager (JobExecutionContext jobExecutionContext) {
+ try {
+ JobSubmissionProtocol submissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol();
+ JobSubmissionInterface jobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface();
+ if (submissionProtocol == JobSubmissionProtocol.SSH) {
+ SSHJobSubmission sshJobSubmission = GFacUtils.getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
+ if (sshJobSubmission != null){
+ jobExecutionContext.setResourceJobManager(sshJobSubmission.getResourceJobManager());
+ }
+ } else if (submissionProtocol == JobSubmissionProtocol.LOCAL){
+ LOCALSubmission localJobSubmission = GFacUtils.getLocalJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
+ if (localJobSubmission != null){
+ jobExecutionContext.setResourceJobManager(localJobSubmission.getResourceJobManager());
+ }
+ }
+ } catch (AppCatalogException e) {
+ log.error("Error occured while retrieving job submission interface", e);
+ }
+ }
+
private boolean submitJob(JobExecutionContext jobExecutionContext) throws GFacException {
// We need to check whether this job is submitted as a part of a large workflow. If yes,
// we need to setup workflow tracking listerner.
http://git-wip-us.apache.org/repos/asf/airavata/blob/afb1cde0/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
index 12b7ad9..7ba5b6b 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
@@ -19,26 +19,21 @@ package org.apache.airavata.gfac.monitor;/*
*
*/
-import org.apache.airavata.commons.gfac.type.HostDescription;
-import org.apache.airavata.gfac.Constants;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.SecurityContext;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.monitor.MonitorID;
import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
-import org.apache.airavata.gfac.ssh.security.TokenizedSSHAuthInfo;
import org.apache.airavata.gsi.ssh.api.ServerInfo;
import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
-import org.apache.airavata.model.workspace.experiment.JobState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Timestamp;
import java.util.Date;
-import java.util.Map;
public class HPCMonitorID extends MonitorID {
private final static Logger logger = LoggerFactory.getLogger(HPCMonitorID.class);
@@ -67,10 +62,18 @@ public class HPCMonitorID extends MonitorID {
SecurityContext securityContext = jobExecutionContext.getSecurityContext(hostAddress);
ServerInfo serverInfo = null;
if (securityContext != null) {
- serverInfo = (((GSISecurityContext) securityContext).getPbsCluster()).getServerInfo();
- }
- if (serverInfo.getUserName() != null) {
- setUserName(serverInfo.getUserName());
+ if (securityContext instanceof GSISecurityContext){
+ serverInfo = (((GSISecurityContext) securityContext).getPbsCluster()).getServerInfo();
+ if (serverInfo.getUserName() != null) {
+ setUserName(serverInfo.getUserName());
+ }
+ }
+ if (securityContext instanceof SSHSecurityContext){
+ serverInfo = (((SSHSecurityContext) securityContext).getPbsCluster()).getServerInfo();
+ if (serverInfo.getUserName() != null) {
+ setUserName(serverInfo.getUserName());
+ }
+ }
}
} catch (GFacException e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/airavata/blob/afb1cde0/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
index e7a081b..1a76c3d 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
@@ -21,6 +21,7 @@
package org.apache.airavata.gfac.monitor.impl.pull.qstat;
import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.SecurityContext;
import org.apache.airavata.gfac.core.monitor.MonitorID;
import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
import org.apache.airavata.gfac.monitor.HostMonitorData;
@@ -49,16 +50,17 @@ public class ResourceConnection {
public ResourceConnection(HostMonitorData hostMonitorData,AuthenticationInfo authInfo) throws SSHApiException {
MonitorID monitorID = hostMonitorData.getMonitorIDs().get(0);
try {
- GSISecurityContext securityContext = (GSISecurityContext)
- monitorID.getJobExecutionContext().getSecurityContext(monitorID.getComputeResourceDescription().getHostName());
+ SecurityContext securityContext = monitorID.getJobExecutionContext().getSecurityContext(monitorID.getComputeResourceDescription().getHostName());
if(securityContext != null) {
- cluster = (PBSCluster) securityContext.getPbsCluster();
- }else {
- SSHSecurityContext sshSecurityContext = (SSHSecurityContext)
- monitorID.getJobExecutionContext().getSecurityContext(monitorID.getComputeResourceDescription().getHostName());
- cluster = (PBSCluster)sshSecurityContext.getPbsCluster();
+ if (securityContext instanceof GSISecurityContext) {
+ GSISecurityContext gsiSecurityContext = (GSISecurityContext) securityContext;
+ cluster = (PBSCluster) gsiSecurityContext.getPbsCluster();
+ } else if (securityContext instanceof SSHSecurityContext) {
+ SSHSecurityContext sshSecurityContext = (SSHSecurityContext)
+ securityContext;
+ cluster = (PBSCluster) sshSecurityContext.getPbsCluster();
+ }
}
-
// we just use cluster configuration from the incoming request and construct a new cluster because for monitoring
// we are using our own credentials and not using one users account to do everything.
authenticationInfo = authInfo;
http://git-wip-us.apache.org/repos/asf/airavata/blob/afb1cde0/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index fd618e4..85b4b5f 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -55,10 +55,7 @@ import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
import org.apache.airavata.gsi.ssh.impl.RawCommandInfo;
import org.apache.airavata.gsi.ssh.impl.StandardOutReader;
import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
-import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
-import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.*;
import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
import org.apache.airavata.model.workspace.experiment.ErrorCategory;
import org.apache.airavata.model.workspace.experiment.JobDetails;
@@ -86,10 +83,8 @@ public class SSHProvider extends AbstractProvider {
try {
super.initialize(jobExecutionContext);
String hostAddress = jobExecutionContext.getHostName();
- AppCatalog appCatalog = jobExecutionContext.getAppCatalog();
- JobSubmissionInterface jobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface();
- SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
- ResourceJobManagerType resourceJobManagerType = sshJobSubmission.getResourceJobManager().getResourceJobManagerType();
+ ResourceJobManager resourceJobManager = jobExecutionContext.getResourceJobManager();
+ ResourceJobManagerType resourceJobManagerType = resourceJobManager.getResourceJobManagerType();
if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
GFACSSHUtils.addSecurityContext(jobExecutionContext);
}
@@ -115,9 +110,6 @@ public class SSHProvider extends AbstractProvider {
} else {
hpcType = true;
}
- } catch (AppCatalogException e) {
- log.error("Error while creating app catalog", e);
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
} catch (ApplicationSettingsException e) {
log.error(e.getMessage());
throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
http://git-wip-us.apache.org/repos/asf/airavata/blob/afb1cde0/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
index cec4d28..00b5e16 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
@@ -22,14 +22,9 @@ package org.apache.airavata.gfac.ssh.util;
import org.airavata.appcatalog.cpi.AppCatalog;
import org.airavata.appcatalog.cpi.AppCatalogException;
-import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.StringUtil;
-import org.apache.airavata.commons.gfac.type.ActualParameter;
-import org.apache.airavata.commons.gfac.type.MappingFactory;
import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
-import org.apache.airavata.gfac.Constants;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.RequestData;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
@@ -47,17 +42,11 @@ import org.apache.airavata.gsi.ssh.impl.GSISSHAbstractCluster;
import org.apache.airavata.gsi.ssh.impl.PBSCluster;
import org.apache.airavata.gsi.ssh.util.CommonUtils;
import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
-import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
-import org.apache.airavata.model.appcatalog.computeresource.SecurityProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.*;
import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
import org.apache.airavata.model.workspace.experiment.ErrorCategory;
import org.apache.airavata.model.workspace.experiment.TaskDetails;
-import org.apache.airavata.schemas.gfac.FileArrayType;
-import org.apache.airavata.schemas.gfac.StringArrayType;
-import org.apache.airavata.schemas.gfac.URIArrayType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,7 +86,7 @@ public class GFACSSHUtils {
Cluster pbsCluster = null;
try {
TokenizedSSHAuthInfo tokenizedSSHAuthInfo = new TokenizedSSHAuthInfo(requestData);
- String installedParentPath = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getExecutablePath();
+ String installedParentPath = jobExecutionContext.getResourceJobManager().getJobManagerBinPath();
if (installedParentPath == null) {
installedParentPath = "/";
}
@@ -224,6 +213,17 @@ public class GFACSSHUtils {
public static JobDescriptor createJobDescriptor(JobExecutionContext jobExecutionContext, Cluster cluster) {
JobDescriptor jobDescriptor = new JobDescriptor();
TaskDetails taskData = jobExecutionContext.getTaskData();
+ ResourceJobManager resourceJobManager = jobExecutionContext.getResourceJobManager();
+ Map<JobManagerCommand, String> jobManagerCommands = resourceJobManager.getJobManagerCommands();
+ String jobManagerBinPath = resourceJobManager.getJobManagerBinPath();
+ if (jobManagerCommands != null && !jobManagerCommands.isEmpty()) {
+ for (JobManagerCommand command : jobManagerCommands.keySet()) {
+ if (command == JobManagerCommand.SUBMISSION) {
+ String commandVal = jobManagerCommands.get(command);
+ jobDescriptor.setJobSubmitter(commandVal);
+ }
+ }
+ }
// this is common for any application descriptor
jobDescriptor.setCallBackIp(ServerSettings.getIp());
jobDescriptor.setCallBackPort(ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.GFAC_SERVER_PORT, "8950"));
@@ -257,7 +257,7 @@ public class GFACSSHUtils {
int totalNodeCount = taskScheduling.getNodeCount();
int totalCPUCount = taskScheduling.getTotalCPUCount();
-// jobDescriptor.setJobSubmitter(applicationDeploymentType.getJobSubmitterCommand());
+
if (taskScheduling.getComputationalProjectAccount() != null) {
jobDescriptor.setAcountString(taskScheduling.getComputationalProjectAccount());
}