You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2014/11/13 17:44:49 UTC

airavata git commit: fixing issues in ssh job submission

Repository: airavata
Updated Branches:
  refs/heads/master d3d04cd17 -> afb1cde05


fixing issues in ssh job submission


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/afb1cde0
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/afb1cde0
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/afb1cde0

Branch: refs/heads/master
Commit: afb1cde05cfaef856123de6c7535b330a73b11c8
Parents: d3d04cd
Author: Chathuri Wimalasena <ka...@gmail.com>
Authored: Thu Nov 13 11:44:21 2014 -0500
Committer: Chathuri Wimalasena <ka...@gmail.com>
Committed: Thu Nov 13 11:44:21 2014 -0500

----------------------------------------------------------------------
 .../gfac/core/context/JobExecutionContext.java  | 17 ++++++++----
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 29 +++++++++++++++++---
 .../airavata/gfac/monitor/HPCMonitorID.java     | 21 ++++++++------
 .../impl/pull/qstat/ResourceConnection.java     | 18 ++++++------
 .../gfac/ssh/provider/impl/SSHProvider.java     | 14 ++--------
 .../airavata/gfac/ssh/util/GFACSSHUtils.java    | 28 +++++++++----------
 6 files changed, 76 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/afb1cde0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
index 4dbde7e..ab15445 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
@@ -36,10 +36,7 @@ import org.apache.airavata.gfac.SecurityContext;
 import org.apache.airavata.gfac.core.cpi.GFac;
 import org.apache.airavata.gfac.core.notification.GFacNotifier;
 import org.apache.airavata.gfac.core.provider.GFacProvider;
-import org.apache.airavata.model.appcatalog.computeresource.DataMovementInterface;
-import org.apache.airavata.model.appcatalog.computeresource.DataMovementProtocol;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.*;
 import org.apache.airavata.model.workspace.experiment.Experiment;
 import org.apache.airavata.model.workspace.experiment.JobDetails;
 import org.apache.airavata.model.workspace.experiment.TaskDetails;
@@ -117,6 +114,8 @@ public class JobExecutionContext extends AbstractContext implements Serializable
      * use preferred job submission protocol.
      */
     private JobSubmissionInterface preferredJobSubmissionInterface;
+
+    private ResourceJobManager resourceJobManager;
     /**
      * List of job submission protocols sorted by priority order.
      */
@@ -300,7 +299,15 @@ public class JobExecutionContext extends AbstractContext implements Serializable
         this.inPath = false;
     }
 
-	public SecurityContext getSecurityContext(String name) throws GFacException{
+    public ResourceJobManager getResourceJobManager() {
+        return resourceJobManager;
+    }
+
+    public void setResourceJobManager(ResourceJobManager resourceJobManager) {
+        this.resourceJobManager = resourceJobManager;
+    }
+
+    public SecurityContext getSecurityContext(String name) throws GFacException{
 		SecurityContext secContext = securityContext.get(name);
 		return secContext;
 	}

http://git-wip-us.apache.org/repos/asf/airavata/blob/afb1cde0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 666190e..b06dbab 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -21,6 +21,7 @@
 package org.apache.airavata.gfac.core.cpi;
 
 import org.airavata.appcatalog.cpi.AppCatalog;
+import org.airavata.appcatalog.cpi.AppCatalogException;
 import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
@@ -53,10 +54,7 @@ import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentD
 import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
 import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
 import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
-import org.apache.airavata.model.appcatalog.computeresource.DataMovementInterface;
-import org.apache.airavata.model.appcatalog.computeresource.FileSystems;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import org.apache.airavata.model.appcatalog.computeresource.*;
 import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
 import org.apache.airavata.model.messaging.event.*;
 import org.apache.airavata.model.workspace.experiment.*;
@@ -338,6 +336,7 @@ public class BetterGfacImpl implements GFac,Watcher {
 
         // set compute resource configuration as default preferred values, after that replace those with gateway user preferences.
         populateDefaultComputeResourceConfiguration(jobExecutionContext, applicationInterface, computeResource);
+        populateResourceJobManager(jobExecutionContext);
         // if gateway resource preference is set
         if (gatewayResourcePreferences != null ) {
             if (gatewayResourcePreferences.getScratchLocation() == null) {
@@ -397,7 +396,9 @@ public class BetterGfacImpl implements GFac,Watcher {
             * Stdout and Stderr for Shell
             */
         jobExecutionContext.setStandardOutput(workingDir + File.separator + applicationInterface.getApplicationName().replaceAll("\\s+", "") + ".stdout");
+        System.out.println("*********************** standared output ************* " + jobExecutionContext.getStandardOutput());
         jobExecutionContext.setStandardError(workingDir + File.separator + applicationInterface.getApplicationName().replaceAll("\\s+", "") + ".stderr");
+        System.out.println("*********************** standared error ************* " + jobExecutionContext.getStandardError());
     }
 
     private void populateDefaultComputeResourceConfiguration(JobExecutionContext jobExecutionContext, ApplicationInterfaceDescription applicationInterface, ComputeResourceDescription computeResource) {
@@ -416,6 +417,26 @@ public class BetterGfacImpl implements GFac,Watcher {
         }
     }
 
+    private void populateResourceJobManager (JobExecutionContext jobExecutionContext) {
+        try {
+            JobSubmissionProtocol submissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol();
+            JobSubmissionInterface jobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface();
+            if (submissionProtocol == JobSubmissionProtocol.SSH) {
+                SSHJobSubmission sshJobSubmission = GFacUtils.getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
+                if (sshJobSubmission != null){
+                    jobExecutionContext.setResourceJobManager(sshJobSubmission.getResourceJobManager());
+                }
+            } else if (submissionProtocol == JobSubmissionProtocol.LOCAL){
+                LOCALSubmission localJobSubmission = GFacUtils.getLocalJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
+                if (localJobSubmission != null){
+                    jobExecutionContext.setResourceJobManager(localJobSubmission.getResourceJobManager());
+                }
+            }
+        } catch (AppCatalogException e) {
+           log.error("Error occured while retrieving job submission interface", e);
+        }
+    }
+
     private boolean submitJob(JobExecutionContext jobExecutionContext) throws GFacException {
         // We need to check whether this job is submitted as a part of a large workflow. If yes,
         // we need to setup workflow tracking listerner.

http://git-wip-us.apache.org/repos/asf/airavata/blob/afb1cde0/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
index 12b7ad9..7ba5b6b 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
@@ -19,26 +19,21 @@ package org.apache.airavata.gfac.monitor;/*
  *
 */
 
-import org.apache.airavata.commons.gfac.type.HostDescription;
-import org.apache.airavata.gfac.Constants;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.SecurityContext;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.monitor.MonitorID;
 import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
 import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
-import org.apache.airavata.gfac.ssh.security.TokenizedSSHAuthInfo;
 import org.apache.airavata.gsi.ssh.api.ServerInfo;
 import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
 import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
 import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
-import org.apache.airavata.model.workspace.experiment.JobState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.sql.Timestamp;
 import java.util.Date;
-import java.util.Map;
 
 public class HPCMonitorID extends MonitorID {
     private final static Logger logger = LoggerFactory.getLogger(HPCMonitorID.class);
@@ -67,10 +62,18 @@ public class HPCMonitorID extends MonitorID {
                 SecurityContext securityContext = jobExecutionContext.getSecurityContext(hostAddress);
                 ServerInfo serverInfo = null;
                 if (securityContext != null) {
-                    serverInfo = (((GSISecurityContext) securityContext).getPbsCluster()).getServerInfo();
-                }
-                if (serverInfo.getUserName() != null) {
-                    setUserName(serverInfo.getUserName());
+                    if (securityContext instanceof  GSISecurityContext){
+                        serverInfo = (((GSISecurityContext) securityContext).getPbsCluster()).getServerInfo();
+                        if (serverInfo.getUserName() != null) {
+                            setUserName(serverInfo.getUserName());
+                        }
+                    }
+                    if (securityContext instanceof SSHSecurityContext){
+                        serverInfo = (((SSHSecurityContext) securityContext).getPbsCluster()).getServerInfo();
+                        if (serverInfo.getUserName() != null) {
+                            setUserName(serverInfo.getUserName());
+                        }
+                    }
                 }
             } catch (GFacException e) {
                 e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/airavata/blob/afb1cde0/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
index e7a081b..1a76c3d 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
@@ -21,6 +21,7 @@
 package org.apache.airavata.gfac.monitor.impl.pull.qstat;
 
 import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.SecurityContext;
 import org.apache.airavata.gfac.core.monitor.MonitorID;
 import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
 import org.apache.airavata.gfac.monitor.HostMonitorData;
@@ -49,16 +50,17 @@ public class ResourceConnection {
     public ResourceConnection(HostMonitorData hostMonitorData,AuthenticationInfo authInfo) throws SSHApiException {
         MonitorID monitorID = hostMonitorData.getMonitorIDs().get(0);
         try {
-            GSISecurityContext securityContext = (GSISecurityContext)
-                    monitorID.getJobExecutionContext().getSecurityContext(monitorID.getComputeResourceDescription().getHostName());
+            SecurityContext securityContext = monitorID.getJobExecutionContext().getSecurityContext(monitorID.getComputeResourceDescription().getHostName());
             if(securityContext != null) {
-                cluster = (PBSCluster) securityContext.getPbsCluster();
-            }else {
-                SSHSecurityContext sshSecurityContext = (SSHSecurityContext)
-                        monitorID.getJobExecutionContext().getSecurityContext(monitorID.getComputeResourceDescription().getHostName());
-                cluster = (PBSCluster)sshSecurityContext.getPbsCluster();
+                if (securityContext instanceof GSISecurityContext) {
+                    GSISecurityContext gsiSecurityContext = (GSISecurityContext) securityContext;
+                    cluster = (PBSCluster) gsiSecurityContext.getPbsCluster();
+                } else if (securityContext instanceof  SSHSecurityContext) {
+                    SSHSecurityContext sshSecurityContext = (SSHSecurityContext)
+                            securityContext;
+                    cluster = (PBSCluster) sshSecurityContext.getPbsCluster();
+                }
             }
-
             // we just use cluster configuration from the incoming request and construct a new cluster because for monitoring
             // we are using our own credentials and not using one users account to do everything.
             authenticationInfo = authInfo;

http://git-wip-us.apache.org/repos/asf/airavata/blob/afb1cde0/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index fd618e4..85b4b5f 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -55,10 +55,7 @@ import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
 import org.apache.airavata.gsi.ssh.impl.RawCommandInfo;
 import org.apache.airavata.gsi.ssh.impl.StandardOutReader;
 import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
-import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
-import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.*;
 import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
 import org.apache.airavata.model.workspace.experiment.ErrorCategory;
 import org.apache.airavata.model.workspace.experiment.JobDetails;
@@ -86,10 +83,8 @@ public class SSHProvider extends AbstractProvider {
         try {
             super.initialize(jobExecutionContext);
             String hostAddress = jobExecutionContext.getHostName();
-            AppCatalog appCatalog = jobExecutionContext.getAppCatalog();
-            JobSubmissionInterface jobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface();
-            SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
-            ResourceJobManagerType resourceJobManagerType = sshJobSubmission.getResourceJobManager().getResourceJobManagerType();
+            ResourceJobManager resourceJobManager = jobExecutionContext.getResourceJobManager();
+            ResourceJobManagerType resourceJobManagerType = resourceJobManager.getResourceJobManagerType();
             if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
                 GFACSSHUtils.addSecurityContext(jobExecutionContext);
             }
@@ -115,9 +110,6 @@ public class SSHProvider extends AbstractProvider {
             } else {
                 hpcType = true;
             }
-        } catch (AppCatalogException e) {
-           log.error("Error while creating app catalog", e);
-            throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
         } catch (ApplicationSettingsException e) {
             log.error(e.getMessage());
             throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());

http://git-wip-us.apache.org/repos/asf/airavata/blob/afb1cde0/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
index cec4d28..00b5e16 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
@@ -22,14 +22,9 @@ package org.apache.airavata.gfac.ssh.util;
 
 import org.airavata.appcatalog.cpi.AppCatalog;
 import org.airavata.appcatalog.cpi.AppCatalogException;
-import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.StringUtil;
-import org.apache.airavata.commons.gfac.type.ActualParameter;
-import org.apache.airavata.commons.gfac.type.MappingFactory;
 import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
-import org.apache.airavata.gfac.Constants;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.RequestData;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
@@ -47,17 +42,11 @@ import org.apache.airavata.gsi.ssh.impl.GSISSHAbstractCluster;
 import org.apache.airavata.gsi.ssh.impl.PBSCluster;
 import org.apache.airavata.gsi.ssh.util.CommonUtils;
 import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
-import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
-import org.apache.airavata.model.appcatalog.computeresource.SecurityProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.*;
 import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
 import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
 import org.apache.airavata.model.workspace.experiment.ErrorCategory;
 import org.apache.airavata.model.workspace.experiment.TaskDetails;
-import org.apache.airavata.schemas.gfac.FileArrayType;
-import org.apache.airavata.schemas.gfac.StringArrayType;
-import org.apache.airavata.schemas.gfac.URIArrayType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -97,7 +86,7 @@ public class GFACSSHUtils {
                     Cluster pbsCluster = null;
                     try {
                         TokenizedSSHAuthInfo tokenizedSSHAuthInfo = new TokenizedSSHAuthInfo(requestData);
-                        String installedParentPath = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getExecutablePath();
+                        String installedParentPath = jobExecutionContext.getResourceJobManager().getJobManagerBinPath();
                         if (installedParentPath == null) {
                             installedParentPath = "/";
                         }
@@ -224,6 +213,17 @@ public class GFACSSHUtils {
     public static JobDescriptor createJobDescriptor(JobExecutionContext jobExecutionContext, Cluster cluster) {
         JobDescriptor jobDescriptor = new JobDescriptor();
         TaskDetails taskData = jobExecutionContext.getTaskData();
+        ResourceJobManager resourceJobManager = jobExecutionContext.getResourceJobManager();
+        Map<JobManagerCommand, String> jobManagerCommands = resourceJobManager.getJobManagerCommands();
+        String jobManagerBinPath = resourceJobManager.getJobManagerBinPath();
+        if (jobManagerCommands != null && !jobManagerCommands.isEmpty()) {
+            for (JobManagerCommand command : jobManagerCommands.keySet()) {
+                if (command == JobManagerCommand.SUBMISSION) {
+                    String commandVal = jobManagerCommands.get(command);
+                    jobDescriptor.setJobSubmitter(commandVal);
+                }
+            }
+        }
         // this is common for any application descriptor
         jobDescriptor.setCallBackIp(ServerSettings.getIp());
         jobDescriptor.setCallBackPort(ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.GFAC_SERVER_PORT, "8950"));
@@ -257,7 +257,7 @@ public class GFACSSHUtils {
             int totalNodeCount = taskScheduling.getNodeCount();
             int totalCPUCount = taskScheduling.getTotalCPUCount();
 
-//        jobDescriptor.setJobSubmitter(applicationDeploymentType.getJobSubmitterCommand());
+
             if (taskScheduling.getComputationalProjectAccount() != null) {
                 jobDescriptor.setAcountString(taskScheduling.getComputationalProjectAccount());
             }