You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2013/09/30 22:08:23 UTC
svn commit: r1527755 - in /airavata/sandbox/gsissh: ./
src/main/java/org/apache/airavata/gsi/ssh/api/
src/main/java/org/apache/airavata/gsi/ssh/api/job/
src/main/java/org/apache/airavata/gsi/ssh/impl/
src/main/java/org/apache/airavata/gsi/ssh/listener/...
Author: lahiru
Date: Mon Sep 30 20:08:23 2013
New Revision: 1527755
URL: http://svn.apache.org/r1527755
Log:
committing major changes to api.
Added:
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Node.java
- copied, changed from r1520685, airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Machine.java
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobDescriptor.java
- copied, changed from r1520685, airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/Job.java
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/PBSCluster.java
- copied, changed from r1523779, airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultCluster.java
Removed:
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Machine.java
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/Job.java
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultCluster.java
Modified:
airavata/sandbox/gsissh/pom.xml
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/CommandExecutor.java
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Core.java
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultJobSubmissionListener.java
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/MyProxyAuthenticationInfo.java
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/JobSubmissionListener.java
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/CommonUtils.java
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SSHUtils.java
airavata/sandbox/gsissh/src/main/resources/PBSTemplate.xslt
airavata/sandbox/gsissh/src/main/resources/gsissh.properties
airavata/sandbox/gsissh/src/main/resources/schemas/PBSJobDescriptor.xsd
airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApiTest.java
Modified: airavata/sandbox/gsissh/pom.xml
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/pom.xml?rev=1527755&r1=1527754&r2=1527755&view=diff
==============================================================================
--- airavata/sandbox/gsissh/pom.xml (original)
+++ airavata/sandbox/gsissh/pom.xml Mon Sep 30 20:08:23 2013
@@ -32,6 +32,10 @@
<id>ogce.m2.all</id>
<url>http://community.ucs.indiana.edu:9090/archiva/repository/ogce.m2.all</url>
</repository>
+<repository>
+ <id>ncsa</id>
+ <url>https://opensource.ncsa.illinois.edu/nexus/content/repositories/snapshots/</url>
+ </repository>
</repositories>
<dependencies>
@@ -82,6 +86,18 @@
<artifactId>xmlbeans</artifactId>
<version>${xmlbeans.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.jcabi</groupId>
+ <artifactId>jcabi-aspects</artifactId>
+ <version>0.9</version>
+ </dependency>
+ <dependency>
+ <groupId>org.aspectj</groupId>
+ <artifactId>aspectjrt</artifactId>
+ <version>1.6.12</version>
+ <scope>runtime</scope>
+ </dependency>
+
</dependencies>
<build>
@@ -125,7 +141,8 @@
<configuration>
<schemaDirectory>src/main/resources/schemas</schemaDirectory>
<xmlConfigs>
- <xmlConfig implementation="java.io.File">src/main/resources/schemas/gsissh-schemas.xsdconfig</xmlConfig>
+ <xmlConfig implementation="java.io.File">src/main/resources/schemas/gsissh-schemas.xsdconfig
+ </xmlConfig>
</xmlConfigs>
<outputJar>target/generated/${project.artifactId}-${project.version}.jar</outputJar>
</configuration>
Modified: airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java?rev=1527755&r1=1527754&r2=1527755&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java (original)
+++ airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java Mon Sep 30 20:08:23 2013
@@ -20,9 +20,8 @@
*/
package org.apache.airavata.gsi.ssh.api;
-import com.jcraft.jsch.Session;
-import org.apache.airavata.gsi.ssh.api.job.Job;
-import org.apache.airavata.gsi.ssh.listener.JobSubmissionListener;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.impl.JobStatus;
/**
* This interface represents a Cluster machine
@@ -34,64 +33,63 @@ public interface Cluster {
/**
* This will submit a job to the cluster with a given pbs file and some parameters
*
- * @param pbsFilePath
- * @param workingDirectory
- * @return
- * @throws SSHApiException
+ * @param pbsFilePath path of the pbs file
+ * @param workingDirectory working directory where pbs should has to copy
+ * @return jobId after successful job submission
+ * @throws SSHApiException throws exception during error
*/
- public String submitAsyncJobWithPBS(String pbsFilePath, String workingDirectory) throws SSHApiException;
+ public String submitBatchJobWithPBS(String pbsFilePath, String workingDirectory) throws SSHApiException;
/**
* This will submit the given job and not performing any monitoring
*
- * @param jobDescriptor
- * @return
- * @throws SSHApiException
+ * @param jobDescriptor job descriptor to submit to cluster, this contains all the parameter
+ * @return jobID after successful job submission.
+ * @throws SSHApiException throws exception during error
*/
- public String submitAsyncJob(Job jobDescriptor) throws SSHApiException;
+ public String submitBatchJob(JobDescriptor jobDescriptor) throws SSHApiException;
/**
* This will get all the information about the cluster and store them as parameters
* So that api user can extract required information about the cluster
*
- * @return
- * @throws SSHApiException
+ * @return return a cluster which consists of information about nodes etc.
+ * @throws SSHApiException throws exception during error
*/
public Cluster loadCluster() throws SSHApiException;
/**
- * This will copy the lFile to rFile location in configured cluster
+ * This will copy the localFile to remoteFile location in configured cluster
*
- * @param rFile
- * @param lFile
- * @return
- * @throws SSHApiException
+ * @param rFile remote file location
+ * @param lFile local file path of the file which needs to copy to remote location
+ * @throws SSHApiException throws exception during error
*/
public void scpTo(String rFile, String lFile) throws SSHApiException;
/**
- * submit a job and register the listener so that status changes will be triggers
- * and appropricate action implemented in the JobSubmissionListener will get invoked
- *
- * @param jobDescriptor
- * @param listener
- * @return
- * @throws SSHApiException
+ * This will get the job description of a job which is there in the cluster
+ * if jbo is not available with the given ID it returns
+ * @param jobID jobId has to pass
+ * @return Returns full job description of the job which submitted successfully
+ * @throws SSHApiException throws exception during error
*/
- public String submitAsyncJob(Job jobDescriptor, JobSubmissionListener listener) throws SSHApiException;
+ public JobDescriptor getJobDescriptorById(String jobID) throws SSHApiException;
/**
- * @param jobID
- * @return
- * @throws SSHApiException
+ * This will delete the given job from the queue
+ *
+ * @param jobID jobId of the job which user wants to delete
+ * @return return the description of the deleted job
+ * @throws SSHApiException throws exception during error
*/
- public Job getJobById(String jobID) throws SSHApiException;
+ public JobDescriptor cancelJob(String jobID) throws SSHApiException;
/**
- * This will delete the given job from the queue
- * @param jobID
- * @return
- * @throws SSHApiException
+ * This will get the job status of the the job associated with this jobId
+ * @param jobID jobId of the job user want to get the status
+ * @return job status of the given jobID
+ * @throws SSHApiException throws exception during error
*/
- public Job cancelJob(String jobID) throws SSHApiException;
+ public JobStatus getJobStatus(String jobID) throws SSHApiException;
}
Modified: airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/CommandExecutor.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/CommandExecutor.java?rev=1527755&r1=1527754&r2=1527755&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/CommandExecutor.java (original)
+++ airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/CommandExecutor.java Mon Sep 30 20:08:23 2013
@@ -20,6 +20,7 @@
*/
package org.apache.airavata.gsi.ssh.api;
+import com.jcabi.aspects.RetryOnFailure;
import com.jcraft.jsch.*;
import org.apache.airavata.gsi.ssh.config.ConfigReader;
import org.apache.airavata.gsi.ssh.jsch.ExtendedJSch;
@@ -50,6 +51,7 @@ public class CommandExecutor {
* @param commandOutput
* @throws SSHApiException
*/
+ @RetryOnFailure(attempts = 3, delay = 1000, verbose = true)
public static Session executeCommand(CommandInfo commandInfo, Session session,
CommandOutput commandOutput) throws SSHApiException {
@@ -70,17 +72,16 @@ public class CommandExecutor {
channel.setInputStream(null);
((ChannelExec) channel).setErrStream(commandOutput.getStandardError());
-
try {
channel.connect();
} catch (JSchException e) {
channel.disconnect();
session.disconnect();
-
throw new SSHApiException("Unable to retrieve command output. Command - " + command, e);
}
+
commandOutput.onOutput(channel);
//Only disconnecting the channel, session can be reused
channel.disconnect();
@@ -95,7 +96,8 @@ public class CommandExecutor {
* @param serverInfo The SSHing server information.
* @param authenticationInfo Security data needs to be communicated with remote server.
* @param commandOutput The output of the command.
- * @throws SSHApiException
+ * @param configReader configuration required for ssh/gshissh connection
+ * @throws SSHApiException throw exception when error occurs
*/
public static void executeCommand(CommandInfo commandInfo, ServerInfo serverInfo,
AuthenticationInfo authenticationInfo,
@@ -103,10 +105,10 @@ public class CommandExecutor {
System.setProperty(X509_CERT_DIR, (String) authenticationInfo.getProperties().get("X509_CERT_DIR"));
JSch jsch = new ExtendedJSch();
- log.info("Connecting to server - " + serverInfo.getHost() + ":" + serverInfo.getPort() + " with user name - "
+ log.debug("Connecting to server - " + serverInfo.getHost() + ":" + serverInfo.getPort() + " with user name - "
+ serverInfo.getUserName());
- Session session = null;
+ Session session;
try {
session = jsch.getSession(serverInfo.getUserName(), serverInfo.getHost(), serverInfo.getPort());
@@ -136,7 +138,7 @@ public class CommandExecutor {
String command = commandInfo.getCommand();
- Channel channel = null;
+ Channel channel;
try {
channel = session.openChannel("exec");
((ChannelExec) channel).setCommand(command);
Modified: airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Core.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Core.java?rev=1527755&r1=1527754&r2=1527755&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Core.java (original)
+++ airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Core.java Mon Sep 30 20:08:23 2013
@@ -20,13 +20,13 @@
*/
package org.apache.airavata.gsi.ssh.api;
-import org.apache.airavata.gsi.ssh.api.job.Job;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
/**
* This represents a CPU core of a machine in the cluster
*/
public class Core {
- private Job job;
+ private JobDescriptor job;
private String id;
public Core(String id) {
@@ -48,11 +48,11 @@ public class Core {
/**
* @return job running on the core
*/
- public Job getJob() {
+ public JobDescriptor getJob() {
return job;
}
- public void setJob(Job job) {
+ public void setJob(JobDescriptor job) {
this.job = job;
}
Copied: airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Node.java (from r1520685, airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Machine.java)
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Node.java?p2=airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Node.java&p1=airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Machine.java&r1=1520685&r2=1527755&rev=1527755&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Machine.java (original)
+++ airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Node.java Mon Sep 30 20:08:23 2013
@@ -22,7 +22,7 @@ package org.apache.airavata.gsi.ssh.api;
import java.util.HashMap;
-public class Machine {
+public class Node {
private String Name;
private Core[] Cores;
private String state;
Copied: airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobDescriptor.java (from r1520685, airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/Job.java)
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobDescriptor.java?p2=airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobDescriptor.java&p1=airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/Job.java&r1=1520685&r2=1527755&rev=1527755&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/Job.java (original)
+++ airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobDescriptor.java Mon Sep 30 20:08:23 2013
@@ -21,6 +21,7 @@
package org.apache.airavata.gsi.ssh.api.job;
import org.apache.airavata.gsi.ssh.api.CommandOutput;
+import org.apache.airavata.gsi.ssh.util.CommonUtils;
import org.apache.airavata.gsi.ssh.x2012.x12.*;
import org.apache.xmlbeans.XmlException;
@@ -30,22 +31,22 @@ import java.util.List;
* This class define a job with required parameters, based on this configuration API is generating a Pbs script and
* submit the job to the computing resource
*/
-public class Job {
+public class JobDescriptor {
private JobDescriptorDocument jobDescriptionDocument;
- public Job() {
+ public JobDescriptor() {
jobDescriptionDocument = JobDescriptorDocument.Factory.newInstance();
jobDescriptionDocument.addNewJobDescriptor();
}
- public Job(JobDescriptorDocument jobDescriptorDocument) {
+ public JobDescriptor(JobDescriptorDocument jobDescriptorDocument) {
this.jobDescriptionDocument = jobDescriptorDocument;
}
- public Job(CommandOutput commandOutput) {
+ public JobDescriptor(CommandOutput commandOutput) {
jobDescriptionDocument = JobDescriptorDocument.Factory.newInstance();
jobDescriptionDocument.addNewJobDescriptor();
}
@@ -59,11 +60,11 @@ public class Job {
return this.jobDescriptionDocument;
}
- public static Job fromXML(String xml)
+ public static JobDescriptor fromXML(String xml)
throws XmlException {
JobDescriptorDocument parse = JobDescriptorDocument.Factory
.parse(xml);
- Job jobDescriptor = new Job(parse);
+ JobDescriptor jobDescriptor = new JobDescriptor(parse);
return jobDescriptor;
}
@@ -113,8 +114,14 @@ public class Job {
this.getJobDescriptorDocument().getJobDescriptor().setProcessesPerNode(name);
}
- public void setMaxWallTime(String name) {
- this.getJobDescriptorDocument().getJobDescriptor().setMaxWallTime(name);
+ /**
+ * Users can pass the minute count for maxwalltime
+ * @param minutes
+ */
+ public void setMaxWallTime(String minutes) {
+ this.getJobDescriptorDocument().getJobDescriptor().setMaxWallTime(
+ CommonUtils.maxWallTimeCalculator(Integer.getInteger(minutes)));
+
}
public void setAcountString(String name) {
@@ -185,7 +192,33 @@ public class Job {
this.getJobDescriptorDocument().getJobDescriptor().setSubmitArgs(submitArgs);
}
-
+ public void setPreJobCommands(String[] commands){
+ if(this.getJobDescriptorDocument().getJobDescriptor().getPreJobCommands() == null){
+ this.getJobDescriptorDocument().getJobDescriptor().addNewPreJobCommands();
+ }
+ this.getJobDescriptorDocument().getJobDescriptor().getPreJobCommands().setCommandArray(commands);
+ }
+
+ public void setPostJobCommands(String[] commands){
+ if(this.getJobDescriptorDocument().getJobDescriptor().getPostJobCommands() == null){
+ this.getJobDescriptorDocument().getJobDescriptor().addNewPostJobCommands();
+ }
+ this.getJobDescriptorDocument().getJobDescriptor().getPostJobCommands().setCommandArray(commands);
+ }
+
+ public void addPreJobCommand(String command){
+ if(this.getJobDescriptorDocument().getJobDescriptor().getPreJobCommands() == null){
+ this.getJobDescriptorDocument().getJobDescriptor().addNewPreJobCommands();
+ }
+ this.getJobDescriptorDocument().getJobDescriptor().getPreJobCommands().addCommand(command);
+ }
+
+ public void addPostJobCommand(String command){
+ if(this.getJobDescriptorDocument().getJobDescriptor().getPostJobCommands() == null){
+ this.getJobDescriptorDocument().getJobDescriptor().addNewPostJobCommands();
+ }
+ this.getJobDescriptorDocument().getJobDescriptor().getPostJobCommands().addCommand(command);
+ }
public String getExecutablePath() {
return this.getJobDescriptorDocument().getJobDescriptor().getExecutablePath();
@@ -298,5 +331,18 @@ public class Job {
}
+ public String[] getPostJobCommands(){
+ if(this.getJobDescriptorDocument().getJobDescriptor().getPostJobCommands() != null) {
+ return this.getJobDescriptorDocument().getJobDescriptor().getPostJobCommands().getCommandArray();
+ }
+ return null;
+ }
+ public String[] getPreJobCommands(){
+ if(this.getJobDescriptorDocument().getJobDescriptor().getPreJobCommands() != null) {
+ return this.getJobDescriptorDocument().getJobDescriptor().getPreJobCommands().getCommandArray();
+ }
+ return null;
+ }
+
}
Modified: airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultJobSubmissionListener.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultJobSubmissionListener.java?rev=1527755&r1=1527754&r2=1527755&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultJobSubmissionListener.java (original)
+++ airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultJobSubmissionListener.java Mon Sep 30 20:08:23 2013
@@ -21,20 +21,22 @@
package org.apache.airavata.gsi.ssh.impl;
import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.gsi.ssh.api.job.Job;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
import org.apache.airavata.gsi.ssh.listener.JobSubmissionListener;
public class DefaultJobSubmissionListener extends JobSubmissionListener {
- public void statusChanged(Job jobDescriptor) throws SSHApiException {
+ public void statusChanged(JobDescriptor jobDescriptor) throws SSHApiException {
System.out.println("Job status has changed to : " + jobDescriptor.getStatus());
}
@Override
+ public void statusChanged(JobStatus jobStatus) throws SSHApiException {
+ System.out.println("Job status has changed to : " + jobStatus.toString());
+ }
+
+ @Override
public boolean isJobDone() throws SSHApiException {
- if(getJobStatus().equals(JobStatus.C)){
- return true;
- }
- return false;
+ return getJobStatus().equals(JobStatus.C);
}
}
Modified: airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/MyProxyAuthenticationInfo.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/MyProxyAuthenticationInfo.java?rev=1527755&r1=1527754&r2=1527755&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/MyProxyAuthenticationInfo.java (original)
+++ airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/MyProxyAuthenticationInfo.java Mon Sep 30 20:08:23 2013
@@ -57,41 +57,10 @@ public class MyProxyAuthenticationInfo e
return userName;
}
- public void setUserName(String userName) {
- this.userName = userName;
- }
-
- public String getPassword() {
- return password;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- public String getMyProxyUrl() {
- return myProxyUrl;
- }
-
- public void setMyProxyUrl(String myProxyUrl) {
- this.myProxyUrl = myProxyUrl;
- }
-
- public int getMyProxyPort() {
- return myProxyPort;
- }
-
- public void setMyProxyPort(int myProxyPort) {
- this.myProxyPort = myProxyPort;
- }
-
public int getLifeTime() {
return lifeTime;
}
- public void setLifeTime(int lifeTime) {
- this.lifeTime = lifeTime;
- }
public GSSCredential getCredentials() throws SecurityException {
return getMyProxyCredentials();
Copied: airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/PBSCluster.java (from r1523779, airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultCluster.java)
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/PBSCluster.java?p2=airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/PBSCluster.java&p1=airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultCluster.java&r1=1523779&r2=1527755&rev=1527755&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultCluster.java (original)
+++ airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/PBSCluster.java Mon Sep 30 20:08:23 2013
@@ -22,10 +22,10 @@ package org.apache.airavata.gsi.ssh.impl
import com.jcraft.jsch.*;
import org.apache.airavata.gsi.ssh.api.*;
-import org.apache.airavata.gsi.ssh.api.job.Job;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
import org.apache.airavata.gsi.ssh.config.ConfigReader;
import org.apache.airavata.gsi.ssh.jsch.ExtendedJSch;
-import org.apache.airavata.gsi.ssh.listener.JobSubmissionListener;
+import org.apache.airavata.gsi.ssh.util.CommonUtils;
import org.apache.airavata.gsi.ssh.util.SSHUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
@@ -45,20 +45,19 @@ import java.util.ArrayList;
* this has most of the methods to be used by the end user of the
* library.
*/
-public class DefaultCluster implements Cluster {
+public class PBSCluster implements Cluster {
static {
JSch.setConfig("gssapi-with-mic.x509", "org.apache.airavata.gsi.ssh.GSSContextX509");
JSch.setConfig("userauth.gssapi-with-mic", "com.jcraft.jsch.UserAuthGSSAPIWithMICGSSCredentials");
}
- private static final Logger log = LoggerFactory.getLogger(DefaultCluster.class);
+ private static final Logger log = LoggerFactory.getLogger(PBSCluster.class);
public static final String X509_CERT_DIR = "X509_CERT_DIR";
- public static final String POLLING_FREQUENCEY = "polling.frequency";
public static final String SSH_SESSION_TIMEOUT = "ssh.session.timeout";
public static final String PBSTEMPLATE_XSLT = "PBSTemplate.xslt";
- private Machine[] Nodes;
+ private Node[] Nodes;
private ServerInfo serverInfo;
@@ -66,11 +65,11 @@ public class DefaultCluster implements C
private Session session;
- private static JSch jSch;
-
private ConfigReader configReader;
- public DefaultCluster(ServerInfo serverInfo, AuthenticationInfo authenticationInfo) throws SSHApiException {
+ private String installedPath;
+
+ public PBSCluster(ServerInfo serverInfo, AuthenticationInfo authenticationInfo, String installedPath) throws SSHApiException {
this.serverInfo = serverInfo;
@@ -78,21 +77,27 @@ public class DefaultCluster implements C
System.setProperty(X509_CERT_DIR, (String) authenticationInfo.getProperties().get(X509_CERT_DIR));
+ if(installedPath.endsWith("/")){
+ this.installedPath = installedPath;
+ }else {
+ this.installedPath = installedPath + "/";
+ }
+
try {
this.configReader = new ConfigReader();
} catch (IOException e) {
throw new SSHApiException("Unable to load system configurations.", e);
}
- this.jSch = new ExtendedJSch();
+ JSch jSch = new ExtendedJSch();
- log.info("Connecting to server - " + serverInfo.getHost() + ":" + serverInfo.getPort() + " with user name - "
+ log.debug("Connecting to server - " + serverInfo.getHost() + ":" + serverInfo.getPort() + " with user name - "
+ serverInfo.getUserName());
try {
session = jSch.getSession(serverInfo.getUserName(), serverInfo.getHost(), serverInfo.getPort());
session.setTimeout(Integer.parseInt(configReader.getConfiguration(SSH_SESSION_TIMEOUT)));
- } catch (Exception e){
- throw new SSHApiException("An exception occurred while creating SSH session." +
+ } catch (Exception e) {
+ throw new SSHApiException("An exception occurred while creating SSH session." +
"Connecting server - " + serverInfo.getHost() + ":" + serverInfo.getPort() +
" connecting user name - "
+ serverInfo.getUserName(), e);
@@ -118,12 +123,12 @@ public class DefaultCluster implements C
}
- public String submitAsyncJobWithPBS(String pbsFilePath, String workingDirectory) throws SSHApiException {
+ public String submitBatchJobWithPBS(String pbsFilePath, String workingDirectory) throws SSHApiException {
this.scpTo(workingDirectory, pbsFilePath);
// since this is a constant we do not ask users to fill this
- RawCommandInfo rawCommandInfo = new RawCommandInfo("/opt/torque/bin/qsub " +
+ RawCommandInfo rawCommandInfo = new RawCommandInfo(this.installedPath + "qsub " +
workingDirectory + File.separator + FilenameUtils.getName(pbsFilePath));
StandardOutReader jobIDReaderCommandOutput = new StandardOutReader();
@@ -134,23 +139,23 @@ public class DefaultCluster implements C
if (jobIDReaderCommandOutput.getErrorifAvailable().equals("")) {
return jobIDReaderCommandOutput.getStdOutput();
} else {
+ //todo during failure do try
throw new SSHApiException(jobIDReaderCommandOutput.getStandardError().toString());
}
}
- public String submitAsyncJob(Job jobDescriptor) throws SSHApiException {
+ public String submitBatchJob(JobDescriptor jobDescriptor) throws SSHApiException {
TransformerFactory factory = TransformerFactory.newInstance();
URL resource = this.getClass().getClassLoader().getResource(PBSTEMPLATE_XSLT);
if (resource == null) {
- String error = "System configuration file '" + DefaultCluster.PBSTEMPLATE_XSLT
+ String error = "System configuration file '" + PBSCluster.PBSTEMPLATE_XSLT
+ "' not found in the classpath";
- log.error(error);
throw new SSHApiException(error);
}
Source xslt = new StreamSource(new File(resource.getPath()));
- Transformer transformer = null;
+ Transformer transformer;
StringWriter results = new StringWriter();
File tempPBSFile = null;
try {
@@ -159,7 +164,7 @@ public class DefaultCluster implements C
Source text = new StreamSource(new ByteArrayInputStream(jobDescriptor.toXML().getBytes()));
transformer.transform(text, new StreamResult(results));
- log.info("generated PBS:" + results.toString());
+ log.debug("generated PBS:" + results.toString());
// creating a temporary file using pbs script generated above
int number = new SecureRandom().nextInt();
@@ -168,11 +173,11 @@ public class DefaultCluster implements C
tempPBSFile = new File(Integer.toString(number) + ".pbs");
FileUtils.writeStringToFile(tempPBSFile, results.toString());
- //reusing submitAsyncJobWithPBS method to submit a job
+ //reusing submitBatchJobWithPBS method to submit a job
- String jobID = this.submitAsyncJobWithPBS(tempPBSFile.getAbsolutePath(),
+ String jobID = this.submitBatchJobWithPBS(tempPBSFile.getAbsolutePath(),
jobDescriptor.getWorkingDirectory());
- log.info("Job has successfully submitted, JobID : " + jobID);
+ log.debug("Job has successfully submitted, JobID : " + jobID);
return jobID.replace("\n", "");
} catch (TransformerConfigurationException e) {
throw new SSHApiException("Error parsing PBS transformation", e);
@@ -184,13 +189,15 @@ public class DefaultCluster implements C
" connecting user name - "
+ serverInfo.getUserName(), e);
} finally {
- tempPBSFile.delete();
+ if(tempPBSFile != null){
+ tempPBSFile.delete();
+ }
}
}
public Cluster loadCluster() throws SSHApiException {
- RawCommandInfo rawCommandInfo = new RawCommandInfo("/opt/torque/bin/qnodes");
+ RawCommandInfo rawCommandInfo = new RawCommandInfo(this.installedPath + "qnodes");
StandardOutReader stdOutReader = new StandardOutReader();
CommandExecutor.executeCommand(rawCommandInfo, this.getSession(), stdOutReader);
@@ -201,12 +208,12 @@ public class DefaultCluster implements C
String[] Nodes = result.split("\n");
String[] line;
String header, value;
- Machine Node;
+ Node Node;
Core[] Cores = null;
- ArrayList<Machine> Machines = new ArrayList<Machine>();
+ ArrayList<Node> Machines = new ArrayList<Node>();
int i = 0;
while (i < Nodes.length) {
- Node = new Machine();
+ Node = new Node();
Node.setName(Nodes[i]);
i++;
@@ -233,13 +240,14 @@ public class DefaultCluster implements C
Node.setNtype(value);
else if ("jobs".equals(header)) {
String[] jobs = value.split(", ");
- Job jo = new Job();
+ JobDescriptor jo;
//Job[] Jobs = new Job[jobs.length];
- for (int j = 0; j < jobs.length; j++) {
- String[] c = jobs[j].split("/");
+ for (String job : jobs) {
+ String[] c = job.split("/");
String Jid = c[1];
- jo = this.getJobById(Jid);
+ jo = this.getJobDescriptorById(Jid);
int core = Integer.parseInt(c[0]);
+ assert Cores != null;
Cores[core].setJob(jo);
}
@@ -251,12 +259,13 @@ public class DefaultCluster implements C
Node.setCores(Cores);
Machines.add(Node);
}
- this.setNodes(Machines.toArray(new Machine[Machines.size()]));
+ this.setNodes(Machines.toArray(new Node[Machines.size()]));
return this;
}
- public Job getJobById(String jobID) throws SSHApiException {
- RawCommandInfo rawCommandInfo = new RawCommandInfo("/opt/torque/bin/qstat -f " + jobID);
+
+ public JobDescriptor getJobDescriptorById(String jobID) throws SSHApiException {
+ RawCommandInfo rawCommandInfo = new RawCommandInfo(this.installedPath + "qstat -f " + jobID);
StandardOutReader stdOutReader = new StandardOutReader();
CommandExecutor.executeCommand(rawCommandInfo, this.getSession(), stdOutReader);
@@ -265,9 +274,7 @@ public class DefaultCluster implements C
}
String result = stdOutReader.getStdOutput();
String[] info = result.split("\n");
- Job jobDescriptor = new Job();
- String header = "";
- String value = "";
+ JobDescriptor jobDescriptor = new JobDescriptor();
String[] line;
for (int i = 0; i < info.length; i++) {
if (info[i].contains("=")) {
@@ -276,9 +283,9 @@ public class DefaultCluster implements C
line = info[i].split(":", 2);
}
if (line.length >= 2) {
- header = line[0].trim();
+ String header = line[0].trim();
log.debug("Header = " + header);
- value = line[1].trim();
+ String value = line[1].trim();
log.debug("value = " + value);
if (header.equals("Variable_List")) {
@@ -356,93 +363,100 @@ public class DefaultCluster implements C
try {
SSHUtils.scpTo(rFile, lFile, session);
} catch (IOException e) {
- new SSHApiException("Faile during scping local file:" + lFile + " to remote file "
+ throw new SSHApiException("Failed during scping local file:" + lFile + " to remote file "
+ serverInfo.getHost() + ":rFile", e);
} catch (JSchException e) {
- new SSHApiException("Faile during scping local file:" + lFile + " to remote file "
+ throw new SSHApiException("Failed during scping local file:" + lFile + " to remote file "
+ serverInfo.getHost() + ":rFile", e);
}
}
-
- private static int checkAck(InputStream in) throws IOException {
- int b = in.read();
- if (b == 0) return b;
- if (b == -1) return b;
-
- if (b == 1 || b == 2) {
- StringBuffer sb = new StringBuffer();
- int c;
- do {
- c = in.read();
- sb.append((char) c);
- }
- while (c != '\n');
- if (b == 1) { // error
- System.out.print(sb.toString());
- }
- if (b == 2) { // fatal error
- System.out.print(sb.toString());
- }
- }
- return b;
- }
-
- public String submitAsyncJob(Job jobDescriptor, JobSubmissionListener listener) throws SSHApiException {
-// final Cluster cluster = this;
- String jobID = this.submitAsyncJob(jobDescriptor);
-// final JobSubmissionListener jobSubmissionListener = listener;
- try {
- // Wait 5 seconds to start the first poll, this is hard coded, user doesn't have
- // to configure this.
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- log.error("Error during job status monitoring");
- throw new SSHApiException("Error during job status monitoring", e);
- }
- // Get the job status first
- try {
//
-// Thread t = new Thread() {
-// @Override
-// public void run() {
-// try {
- Job jobById = this.getJobById(jobID);
- while (true) {
- while (!jobById.getStatus().equals(JobStatus.C.toString())) {
- if (!jobById.getStatus().equals(listener.getJobStatus().toString())) {
- listener.setJobStatus(JobStatus.fromString(jobById.getStatus()));
- listener.statusChanged(jobById);
- }
- Thread.sleep(Long.parseLong(configReader.getConfiguration(POLLING_FREQUENCEY)));
-
- jobById = this.getJobById(jobID);
- }
- //Set the job status to Complete
- listener.setJobStatus(JobStatus.C);
- listener.statusChanged(jobById);
- break;
- }
-// } catch (InterruptedException e) {
-// log.error("Error listening to the submitted job", e);
-// } catch (SSHApiException e) {
-// log.error("Error listening to the submitted job", e);
+// public String submitAsyncJob(Job jobDescriptor, JobSubmissionListener listener) throws SSHApiException {
+//// final Cluster cluster = this;
+// String jobID = this.submitBatchJob(jobDescriptor);
+//// final JobSubmissionListener jobSubmissionListener = listener;
+// try {
+// // Wait 5 seconds to start the first poll, this is hard coded, user doesn't have
+// // to configure this.
+// Thread.sleep(5000);
+// } catch (InterruptedException e) {
+// log.error("Error during job status monitoring");
+// throw new SSHApiException("Error during job status monitoring", e);
+// }
+// // Get the job status first
+// try {
+////
+//// Thread t = new Thread() {
+//// @Override
+//// public void run() {
+//// try {
+// Job jobById = this.getJobDescriptorById(jobID);
+// while (true) {
+// while (!jobById.getStatus().equals(JobStatus.C.toString())) {
+// if (!jobById.getStatus().equals(listener.getJobStatus().toString())) {
+// listener.setJobStatus(JobStatus.fromString(jobById.getStatus()));
+// listener.statusChanged(jobById);
// }
+// Thread.sleep(Long.parseLong(configReader.getConfiguration(POLLING_FREQUENCEY)));
+//
+// jobById = this.getJobDescriptorById(jobID);
// }
-// };
- // This thread runs until the program termination, so that use can provide
- // any action in onChange method of the listener, without worrying for waiting in the caller thread.
- //t.setDaemon(true);
-// t.start();
- } catch (Exception e) {
- log.error("Error during job status monitoring");
- throw new SSHApiException("Error during job status monitoring", e);
+// //Set the job status to Complete
+// listener.setJobStatus(JobStatus.C);
+// listener.statusChanged(jobById);
+// break;
+// }
+//// } catch (InterruptedException e) {
+//// log.error("Error listening to the submitted job", e);
+//// } catch (SSHApiException e) {
+//// log.error("Error listening to the submitted job", e);
+//// }
+//// }
+//// };
+// // This thread runs until the program termination, so that use can provide
+// // any action in onChange method of the listener, without worrying for waiting in the caller thread.
+// //t.setDaemon(true);
+//// t.start();
+// } catch (Exception e) {
+// log.error("Error during job status monitoring");
+// throw new SSHApiException("Error during job status monitoring", e);
+// }
+// return jobID; //To change body of implemented methods use File | Settings | File Templates.
+// }
+
+ public JobStatus getJobStatus(String jobID) throws SSHApiException {
+ RawCommandInfo rawCommandInfo = new RawCommandInfo(this.installedPath + "qstat -f " + jobID);
+
+ StandardOutReader stdOutReader = new StandardOutReader();
+ CommandExecutor.executeCommand(rawCommandInfo, this.getSession(), stdOutReader);
+ // check the standard error, incase user gave wrong jobID
+ if (!stdOutReader.getErrorifAvailable().equals("")) {
+ throw new SSHApiException(stdOutReader.getStandardError().toString());
+ }
+ String result = stdOutReader.getStdOutput();
+ String[] info = result.split("\n");
+ String header = "";
+ String value = "";
+ String[] line;
+ for (String anInfo : info) {
+ if (anInfo.contains("=")) {
+ line = anInfo.split("=", 2);
+ } else {
+ line = anInfo.split(":", 2);
+ }
+ if (line.length >= 2) {
+ } else if ("job_state".equals(header)) {
+ return JobStatus.valueOf(value);
+ } else {
+ throw new SSHApiException(stdOutReader.getStandardError().toString());
+ }
}
- return jobID; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
- public Job cancelJob(String jobID) throws SSHApiException {
- RawCommandInfo rawCommandInfo = new RawCommandInfo("/opt/torque/bin/qdel " + jobID);
+ public JobDescriptor cancelJob(String jobID) throws SSHApiException {
+ RawCommandInfo rawCommandInfo = new RawCommandInfo(this.installedPath + "qdel " + jobID);
StandardOutReader stdOutReader = new StandardOutReader();
CommandExecutor.executeCommand(rawCommandInfo, this.getSession(), stdOutReader);
@@ -450,12 +464,12 @@ public class DefaultCluster implements C
throw new SSHApiException(stdOutReader.getStandardError().toString());
}
- Job jobById = this.getJobById(jobID);
- if(jobById.getStatus().equals(JobStatus.C.toString())) {
- log.info("Job Cancel operation was successful !");
+ JobDescriptor jobById = this.getJobDescriptorById(jobID);
+ if (CommonUtils.isJobFinished(jobById)) {
+ log.debug("Job Cancel operation was successful !");
return jobById;
- }else {
- log.info("Job Cancel operation was not successful !");
+ } else {
+ log.debug("Job Cancel operation was not successful !");
return null;
}
}
@@ -475,11 +489,11 @@ public class DefaultCluster implements C
/**
* @return cluster Nodes as array of machines
*/
- public Machine[] getNodes() {
+ public Node[] getNodes() {
return Nodes;
}
- public void setNodes(Machine[] Nodes) {
+ public void setNodes(Node[] Nodes) {
this.Nodes = Nodes;
}
Modified: airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/JobSubmissionListener.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/JobSubmissionListener.java?rev=1527755&r1=1527754&r2=1527755&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/JobSubmissionListener.java (original)
+++ airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/JobSubmissionListener.java Mon Sep 30 20:08:23 2013
@@ -21,7 +21,7 @@
package org.apache.airavata.gsi.ssh.listener;
import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.gsi.ssh.api.job.Job;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
import org.apache.airavata.gsi.ssh.impl.JobStatus;
/**
@@ -35,12 +35,20 @@ public abstract class JobSubmissionListe
private JobStatus jobStatus = JobStatus.U;
/**
- * this will get called during job status change
+ * This can be usd to perform some operation during status change
*
* @param jobDescriptor
* @throws SSHApiException
*/
- public abstract void statusChanged(Job jobDescriptor) throws SSHApiException;
+ public abstract void statusChanged(JobDescriptor jobDescriptor) throws SSHApiException;
+
+ /**
+ * This can be usd to perform some operation during status change
+ * @param jobStatus
+ * @throws SSHApiException
+ */
+ public abstract void statusChanged(JobStatus jobStatus) throws SSHApiException;
+
public JobStatus getJobStatus() {
return jobStatus;
Modified: airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/CommonUtils.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/CommonUtils.java?rev=1527755&r1=1527754&r2=1527755&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/CommonUtils.java (original)
+++ airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/CommonUtils.java Mon Sep 30 20:08:23 2013
@@ -20,7 +20,7 @@
*/
package org.apache.airavata.gsi.ssh.util;
-import org.apache.airavata.gsi.ssh.api.job.Job;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
import org.apache.airavata.gsi.ssh.impl.JobStatus;
public class CommonUtils {
@@ -30,11 +30,26 @@ public class CommonUtils {
* @param job
* @return
*/
- public static boolean isJobFinished(Job job) {
+ public static boolean isJobFinished(JobDescriptor job) {
if (JobStatus.C.toString().equals(job.getStatus())) {
return true;
} else {
return false;
}
}
+
+ /**
+ * This will read
+ * @param maxWalltime
+ * @return
+ */
+ public static String maxWallTimeCalculator(int maxWalltime) {
+ if (maxWalltime < 60) {
+ return "00:" + maxWalltime + ":00";
+ } else {
+ int minutes = maxWalltime % 60;
+ int hours = maxWalltime / 60;
+ return hours + ":" + minutes + ":00";
+ }
+ }
}
Modified: airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SSHUtils.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SSHUtils.java?rev=1527755&r1=1527754&r2=1527755&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SSHUtils.java (original)
+++ airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SSHUtils.java Mon Sep 30 20:08:23 2013
@@ -91,7 +91,7 @@ public class SSHUtils {
}
JSch jsch = new ExtendedJSch();
- log.info("Connecting to server - " + serverInfo.getHost() + ":" + serverInfo.getPort() + " with user name - "
+ log.debug("Connecting to server - " + serverInfo.getHost() + ":" + serverInfo.getPort() + " with user name - "
+ serverInfo.getUserName());
Session session = null;
Modified: airavata/sandbox/gsissh/src/main/resources/PBSTemplate.xslt
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/resources/PBSTemplate.xslt?rev=1527755&r1=1527754&r2=1527755&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/resources/PBSTemplate.xslt (original)
+++ airavata/sandbox/gsissh/src/main/resources/PBSTemplate.xslt Mon Sep 30 20:08:23 2013
@@ -25,11 +25,18 @@
export<xsl:text> </xsl:text><xsl:value-of select="."/>
<xsl:text>
</xsl:text>
</xsl:for-each>
+<xsl:for-each select="ns:preJobCommands/ns:command">
+ <xsl:value-of select="."/><xsl:text> </xsl:text>
+ </xsl:for-each>
cd <xsl:text> </xsl:text><xsl:value-of select="ns:workingDirectory"/><xsl:text>
</xsl:text>
<xsl:value-of select="ns:executablePath"/><xsl:text> </xsl:text>
<xsl:for-each select="ns:inputs/ns:input">
<xsl:value-of select="."/><xsl:text> </xsl:text>
</xsl:for-each>
+<xsl:for-each select="ns:postJobCommands/ns:command">
+ <xsl:value-of select="."/><xsl:text> </xsl:text>
+</xsl:for-each>
+
</xsl:template>
</xsl:stylesheet>
\ No newline at end of file
Modified: airavata/sandbox/gsissh/src/main/resources/gsissh.properties
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/resources/gsissh.properties?rev=1527755&r1=1527754&r2=1527755&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/resources/gsissh.properties (original)
+++ airavata/sandbox/gsissh/src/main/resources/gsissh.properties Mon Sep 30 20:08:23 2013
@@ -1,5 +1,4 @@
# Specifies system level configurations as a key/value pairs.
StrictHostKeyChecking=no
-polling.frequency=60000
ssh.session.timeout=360000
\ No newline at end of file
Modified: airavata/sandbox/gsissh/src/main/resources/schemas/PBSJobDescriptor.xsd
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/resources/schemas/PBSJobDescriptor.xsd?rev=1527755&r1=1527754&r2=1527755&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/resources/schemas/PBSJobDescriptor.xsd (original)
+++ airavata/sandbox/gsissh/src/main/resources/schemas/PBSJobDescriptor.xsd Mon Sep 30 20:08:23 2013
@@ -47,8 +47,22 @@
<element name="usedMem" type="xsd:string" minOccurs="0" maxOccurs="1"/>
<element name="submitArgs" type="xsd:string" minOccurs="0" maxOccurs="1"/>
<element name="variableList" type="xsd:string" minOccurs="0" maxOccurs="1"/>
+ <element name="preJobCommands" type="gsissh:preJobCommands" minOccurs="0" maxOccurs="1"/>
+ <element name="postJobCommands" type="gsissh:postJobCommands" minOccurs="0" maxOccurs="1"/>
</sequence>
</complexType>
+
+ <complexType name="preJobCommands">
+ <sequence>
+ <element name="command" type="xsd:string" minOccurs="0" maxOccurs="unbounded" />
+ </sequence>
+ </complexType>
+
+ <complexType name="postJobCommands">
+ <sequence>
+ <element name="command" type="xsd:string" minOccurs="0" maxOccurs="unbounded" />
+ </sequence>
+ </complexType>
<complexType name="inputList">
<sequence>
<element name="input" type="xsd:string" minOccurs="0" maxOccurs="unbounded" />
Modified: airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApiTest.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApiTest.java?rev=1527755&r1=1527754&r2=1527755&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApiTest.java (original)
+++ airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApiTest.java Mon Sep 30 20:08:23 2013
@@ -23,9 +23,11 @@ package org.apache.airavata.gsi.ssh.impl
import junit.framework.Assert;
import org.apache.airavata.gsi.ssh.api.*;
-import org.apache.airavata.gsi.ssh.api.job.Job;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
import org.apache.airavata.gsi.ssh.config.ConfigReader;
import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
@@ -43,7 +45,7 @@ import java.util.Random;
public class DefaultSSHApiTest {
-
+ private static final Logger log = LoggerFactory.getLogger(PBSCluster.class);
private String myProxyUserName;
private String myProxyPassword;
private String certificateLocation;
@@ -53,9 +55,9 @@ public class DefaultSSHApiTest {
@BeforeTest
public void setUp() throws Exception {
-// System.setProperty("myproxy.user", "ogce");
-// System.setProperty("myproxy.password", "");
-// System.setProperty("basedir", "/Users/lahirugunathilake/work/airavata/sandbox/gsissh");
+ System.setProperty("myproxy.user", "ogce");
+ System.setProperty("myproxy.password", "Jdas7wph");
+ System.setProperty("basedir", "/Users/lahirugunathilake/work/airavata/sandbox/gsissh");
myProxyUserName = System.getProperty("myproxy.user");
myProxyPassword = System.getProperty("myproxy.password");
@@ -87,7 +89,7 @@ public class DefaultSSHApiTest {
// Create authentication
AuthenticationInfo authenticationInfo
= new MyProxyAuthenticationInfo(myProxyUserName, myProxyPassword, "myproxy.teragrid.org",
- 7512, 17280000,certificateLocation);
+ 7512, 17280000, certificateLocation);
// Create command
CommandInfo commandInfo = new RawCommandInfo("/bin/ls");
@@ -108,22 +110,22 @@ public class DefaultSSHApiTest {
// Create authentication
AuthenticationInfo authenticationInfo
= new MyProxyAuthenticationInfo(myProxyUserName, myProxyPassword, "myproxy.teragrid.org",
- 7512, 17280000,certificateLocation);
+ 7512, 17280000, certificateLocation);
// Server info
ServerInfo serverInfo = new ServerInfo("ogce", "trestles.sdsc.edu");
- Cluster trestles = new DefaultCluster(serverInfo, authenticationInfo);
+ Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, "/opt/torque/bin/");
// Execute command
System.out.println("Target PBS file path: " + workingDirectory);
System.out.println("Local PBS File path: " + pbsFilePath);
- Job jobDescriptor = new Job();
+ JobDescriptor jobDescriptor = new JobDescriptor();
//Here we give working directory as a file name to replace the file, to allow multiple test runs with the same
//file name
jobDescriptor.setWorkingDirectory(workingDirectory);
- String jobID = trestles.submitAsyncJobWithPBS(pbsFilePath, workingDirectory);
+ String jobID = pbsCluster.submitBatchJobWithPBS(pbsFilePath, workingDirectory);
System.out.println("JobID returned : " + jobID);
}
@@ -132,13 +134,13 @@ public class DefaultSSHApiTest {
// Create authentication
AuthenticationInfo authenticationInfo
= new MyProxyAuthenticationInfo(myProxyUserName, myProxyPassword, "myproxy.teragrid.org",
- 7512, 17280000,certificateLocation);
+ 7512, 17280000, certificateLocation);
// Server info
ServerInfo serverInfo = new ServerInfo("ogce", "trestles.sdsc.edu");
- Cluster trestles = new DefaultCluster(serverInfo, authenticationInfo);
+ Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, "/opt/torque/bin/");
// Execute command
@@ -146,7 +148,7 @@ public class DefaultSSHApiTest {
System.out.println("Local PBS File path: " + pbsFilePath);
String workingDirectory = File.separator + "home" + File.separator + "ogce" + File.separator + "gsissh";
// constructing the job object
- Job jobDescriptor = new Job();
+ JobDescriptor jobDescriptor = new JobDescriptor();
jobDescriptor.setWorkingDirectory(workingDirectory);
jobDescriptor.setShellName("/bin/bash");
jobDescriptor.setJobName("GSI_SSH_SLEEP_JOB");
@@ -165,12 +167,12 @@ public class DefaultSSHApiTest {
jobDescriptor.setInputValues(inputs);
//finished construction of job object
System.out.println(jobDescriptor.toXML());
- jobID = trestles.submitAsyncJob(jobDescriptor);
+ jobID = pbsCluster.submitBatchJob(jobDescriptor);
System.out.println("JobID returned : " + jobID);
// Cluster cluster = sshApi.getCluster(serverInfo, authenticationInfo);
Thread.sleep(1000);
- Job jobById = trestles.getJobById(jobID);
+ JobDescriptor jobById = pbsCluster.getJobDescriptorById(jobID);
//printing job data got from previous call
AssertJUnit.assertEquals(jobById.getJobId(), jobID);
@@ -194,8 +196,6 @@ public class DefaultSSHApiTest {
}
-
-
@Test
public void testGetCluster() throws Exception {
// AuthenticationInfo authenticationInfo
@@ -220,18 +220,18 @@ public class DefaultSSHApiTest {
// Create authentication
AuthenticationInfo authenticationInfo
= new MyProxyAuthenticationInfo(myProxyUserName, myProxyPassword, "myproxy.teragrid.org",
- 7512, 17280000,certificateLocation);
+ 7512, 17280000, certificateLocation);
// Server info
ServerInfo serverInfo = new ServerInfo("ogce", "trestles.sdsc.edu");
- Cluster trestles = new DefaultCluster(serverInfo, authenticationInfo);
+ Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, "/opt/torque/bin/");
// Execute command
System.out.println("Target PBS file path: " + workingDirectory);
System.out.println("Local PBS File path: " + pbsFilePath);
String workingDirectory = File.separator + "home" + File.separator + "ogce" + File.separator + "gsissh";
- Job jobDescriptor = new Job();
+ JobDescriptor jobDescriptor = new JobDescriptor();
jobDescriptor.setWorkingDirectory(workingDirectory);
jobDescriptor.setShellName("/bin/bash");
jobDescriptor.setJobName("GSI_SSH_SLEEP_JOB");
@@ -250,7 +250,7 @@ public class DefaultSSHApiTest {
jobDescriptor.setInputValues(inputs);
System.out.println(jobDescriptor.toXML());
try {
- String jobID = trestles.submitAsyncJob(jobDescriptor);
+ String jobID = pbsCluster.submitBatchJob(jobDescriptor);
System.out.println("JobID returned : " + jobID);
} catch (SSHApiException e) {
System.out.println(e.getMessage());
@@ -262,20 +262,20 @@ public class DefaultSSHApiTest {
// Create authentication
AuthenticationInfo authenticationInfo
= new MyProxyAuthenticationInfo(myProxyUserName, myProxyPassword, "myproxy.teragrid.org",
- 7512, 17280000,certificateLocation);
+ 7512, 17280000, certificateLocation);
// Server info
ServerInfo serverInfo = new ServerInfo("ogce", "trestles.sdsc.edu");
- Cluster trestles = new DefaultCluster(serverInfo, authenticationInfo);
+ Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, "/opt/torque/bin1/");
// Execute command
System.out.println("Target PBS file path: " + workingDirectory);
System.out.println("Local PBS File path: " + pbsFilePath);
String workingDirectory = File.separator + "home" + File.separator + "ogce" + File.separator + "gsissh";
- Job jobDescriptor = new Job();
+ JobDescriptor jobDescriptor = new JobDescriptor();
jobDescriptor.setWorkingDirectory(workingDirectory);
jobDescriptor.setShellName("/bin/bash");
jobDescriptor.setJobName("GSI_SSH_SLEEP_JOB");
@@ -286,7 +286,7 @@ public class DefaultSSHApiTest {
jobDescriptor.setStandardErrorFile(workingDirectory + File.separator + "application.err");
jobDescriptor.setNodes(1);
jobDescriptor.setProcessesPerNode(1);
- jobDescriptor.setMaxWallTime("1:00:00");
+// jobDescriptor.setMaxWallTime("1:00:00");
jobDescriptor.setQueueName("normal");
jobDescriptor.setAcountString("sds128");
List<String> inputs = new ArrayList<String>();
@@ -294,9 +294,56 @@ public class DefaultSSHApiTest {
jobDescriptor.setInputValues(inputs);
System.out.println(jobDescriptor.toXML());
DefaultJobSubmissionListener listener = new DefaultJobSubmissionListener();
- String status = trestles.submitAsyncJob(jobDescriptor, listener);
- while(!listener.isJobDone()){
- Thread.sleep(10000);
+ String jobID = pbsCluster.submitBatchJob(jobDescriptor);
+ try {
+// // Wait 5 seconds to start the first poll, this is hard coded, user doesn't have
+// // to configure this.
+// Thread.sleep(5000);
+// } catch (InterruptedException e) {
+// log.error("Error during job status monitoring");
+// throw new SSHApiException("Error during job status monitoring", e);
+// }
+// // Get the job status first
+// try {
+////
+//// Thread t = new Thread() {
+//// @Override
+//// public void run() {
+//// try {
+ // p
+ JobStatus jobStatus = pbsCluster.getJobStatus(jobID);
+ while (true) {
+ while (!jobStatus.equals(JobStatus.C.toString())) {
+ if (!jobStatus.equals(listener.getJobStatus().toString())) {
+ listener.setJobStatus(jobStatus);
+ listener.statusChanged(jobStatus);
+ }
+ Thread.sleep(60000);
+
+ jobStatus = pbsCluster.getJobStatus(jobID);
+ }
+ //Set the job status to Complete
+ listener.setJobStatus(JobStatus.C);
+ listener.statusChanged(jobStatus);
+ break;
+ }
+// } catch (InterruptedException e) {
+// log.error("Error listening to the submitted job", e);
+// } catch (SSHApiException e) {
+// log.error("Error listening to the submitted job", e);
+// }
+// }
+// };
+ // This thread runs until the program termination, so that use can provide
+// // any action in onChange method of the listener, without worrying for waiting in the caller thread.
+ //t.setDaemon(true);
+// t.start();
+ } catch (Exception e) {
+ log.error("Error during job status monitoring");
+ throw new SSHApiException("Error during job status monitoring", e);
+ }
+ while (!listener.isJobDone()) {
+ Thread.sleep(10000);
}
}
@@ -305,20 +352,20 @@ public class DefaultSSHApiTest {
// Create authentication
AuthenticationInfo authenticationInfo
= new MyProxyAuthenticationInfo(myProxyUserName, myProxyPassword, "myproxy.teragrid.org",
- 7512, 17280000,certificateLocation);
+ 7512, 17280000, certificateLocation);
// Server info
ServerInfo serverInfo = new ServerInfo("ogce", "trestles.sdsc.edu");
- Cluster trestles = new DefaultCluster(serverInfo, authenticationInfo);
+ Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, "/opt/torque/bin/");
// Execute command
System.out.println("Target PBS file path: " + workingDirectory);
System.out.println("Local PBS File path: " + pbsFilePath);
String workingDirectory = File.separator + "home" + File.separator + "ogce" + File.separator + "gsissh";
- Job jobDescriptor = new Job();
+ JobDescriptor jobDescriptor = new JobDescriptor();
jobDescriptor.setWorkingDirectory(workingDirectory);
jobDescriptor.setShellName("/bin/bash");
jobDescriptor.setJobName("GSI_SSH_SLEEP_JOB");
@@ -336,14 +383,14 @@ public class DefaultSSHApiTest {
inputs.add("1000");
jobDescriptor.setInputValues(inputs);
System.out.println(jobDescriptor.toXML());
- String jobID = trestles.submitAsyncJob(jobDescriptor);
+ String jobID = pbsCluster.submitBatchJob(jobDescriptor);
System.out.println("Job submitted to successfully : " + jobID);
- Job jobById = trestles.getJobById(jobID);
- if(!CommonUtils.isJobFinished(jobById)) {
- Job job = trestles.cancelJob(jobID);
+ JobDescriptor jobById = pbsCluster.getJobDescriptorById(jobID);
+ if (!CommonUtils.isJobFinished(jobById)) {
+ JobDescriptor job = pbsCluster.cancelJob(jobID);
if (CommonUtils.isJobFinished(job)) {
Assert.assertTrue(true);
- }else{
+ } else {
Assert.assertTrue(true);
}
}