You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2013/09/16 21:54:43 UTC
svn commit: r1523779 - in /airavata/sandbox/gsissh/src:
main/java/org/apache/airavata/gsi/ssh/api/
main/java/org/apache/airavata/gsi/ssh/impl/
main/java/org/apache/airavata/gsi/ssh/listener/
main/java/org/apache/airavata/gsi/ssh/util/ main/resources/ t...
Author: lahiru
Date: Mon Sep 16 19:54:42 2013
New Revision: 1523779
URL: http://svn.apache.org/r1523779
Log:
modifications to sshapi.
Added:
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/CommonUtils.java
Modified:
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/ServerInfo.java
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultCluster.java
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultJobSubmissionListener.java
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/JobSubmissionListener.java
airavata/sandbox/gsissh/src/main/resources/PBSTemplate.xslt
airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/config/SCPToTest.java
airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApiTest.java
Modified: airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java?rev=1523779&r1=1523778&r2=1523779&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java (original)
+++ airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java Mon Sep 16 19:54:42 2013
@@ -86,4 +86,12 @@ public interface Cluster {
* @throws SSHApiException
*/
public Job getJobById(String jobID) throws SSHApiException;
+
+ /**
+ * This will delete the given job from the queue
+ * @param jobID
+ * @return
+ * @throws SSHApiException
+ */
+ public Job cancelJob(String jobID) throws SSHApiException;
}
Modified: airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/ServerInfo.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/ServerInfo.java?rev=1523779&r1=1523778&r2=1523779&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/ServerInfo.java (original)
+++ airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/ServerInfo.java Mon Sep 16 19:54:42 2013
@@ -33,7 +33,7 @@ public class ServerInfo {
this.host = host;
}
- public ServerInfo(String host, String userName, int port) {
+ public ServerInfo(String userName,String host, int port) {
this.host = host;
this.userName = userName;
this.port = port;
Modified: airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultCluster.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultCluster.java?rev=1523779&r1=1523778&r2=1523779&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultCluster.java (original)
+++ airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultCluster.java Mon Sep 16 19:54:42 2013
@@ -20,7 +20,6 @@
*/
package org.apache.airavata.gsi.ssh.impl;
-import bsh.This;
import com.jcraft.jsch.*;
import org.apache.airavata.gsi.ssh.api.*;
import org.apache.airavata.gsi.ssh.api.job.Job;
@@ -37,6 +36,7 @@ import javax.xml.transform.*;
import javax.xml.transform.stream.StreamResult;
import javax.xml.transform.stream.StreamSource;
import java.io.*;
+import java.net.URL;
import java.security.SecureRandom;
import java.util.ArrayList;
@@ -56,6 +56,7 @@ public class DefaultCluster implements C
public static final String X509_CERT_DIR = "X509_CERT_DIR";
public static final String POLLING_FREQUENCEY = "polling.frequency";
public static final String SSH_SESSION_TIMEOUT = "ssh.session.timeout";
+ public static final String PBSTEMPLATE_XSLT = "PBSTemplate.xslt";
private Machine[] Nodes;
@@ -90,8 +91,8 @@ public class DefaultCluster implements C
try {
session = jSch.getSession(serverInfo.getUserName(), serverInfo.getHost(), serverInfo.getPort());
session.setTimeout(Integer.parseInt(configReader.getConfiguration(SSH_SESSION_TIMEOUT)));
- } catch (JSchException e) {
- throw new SSHApiException("An exception occurred while creating SSH session." +
+ } catch (Exception e){
+ throw new SSHApiException("An exception occurred while creating SSH session." +
"Connecting server - " + serverInfo.getHost() + ":" + serverInfo.getPort() +
" connecting user name - "
+ serverInfo.getUserName(), e);
@@ -139,8 +140,16 @@ public class DefaultCluster implements C
public String submitAsyncJob(Job jobDescriptor) throws SSHApiException {
TransformerFactory factory = TransformerFactory.newInstance();
- String xsltPath = "src" + File.separator + "main" + File.separator + "resources" + File.separator + "PBSTemplate.xslt";
- Source xslt = new StreamSource(new File(xsltPath));
+ URL resource = this.getClass().getClassLoader().getResource(PBSTEMPLATE_XSLT);
+
+ if (resource == null) {
+ String error = "System configuration file '" + DefaultCluster.PBSTEMPLATE_XSLT
+ + "' not found in the classpath";
+ log.error(error);
+ throw new SSHApiException(error);
+ }
+
+ Source xslt = new StreamSource(new File(resource.getPath()));
Transformer transformer = null;
StringWriter results = new StringWriter();
File tempPBSFile = null;
@@ -356,7 +365,7 @@ public class DefaultCluster implements C
}
- static int checkAck(InputStream in) throws IOException {
+ private static int checkAck(InputStream in) throws IOException {
int b = in.read();
if (b == 0) return b;
if (b == -1) return b;
@@ -380,9 +389,9 @@ public class DefaultCluster implements C
}
public String submitAsyncJob(Job jobDescriptor, JobSubmissionListener listener) throws SSHApiException {
- final Cluster cluster = this;
- final String jobID = this.submitAsyncJob(jobDescriptor);
- final JobSubmissionListener jobSubmissionListener = listener;
+// final Cluster cluster = this;
+ String jobID = this.submitAsyncJob(jobDescriptor);
+// final JobSubmissionListener jobSubmissionListener = listener;
try {
// Wait 5 seconds to start the first poll, this is hard coded, user doesn't have
// to configure this.
@@ -393,38 +402,38 @@ public class DefaultCluster implements C
}
// Get the job status first
try {
-
- Thread t = new Thread() {
- @Override
- public void run() {
- try {
- Job jobById = cluster.getJobById(jobID);
+//
+// Thread t = new Thread() {
+// @Override
+// public void run() {
+// try {
+ Job jobById = this.getJobById(jobID);
while (true) {
while (!jobById.getStatus().equals(JobStatus.C.toString())) {
- if (!jobById.getStatus().equals(jobSubmissionListener.getJobStatus().toString())) {
- jobSubmissionListener.setJobStatus(JobStatus.fromString(jobById.getStatus()));
- jobSubmissionListener.statusChanged(jobById);
+ if (!jobById.getStatus().equals(listener.getJobStatus().toString())) {
+ listener.setJobStatus(JobStatus.fromString(jobById.getStatus()));
+ listener.statusChanged(jobById);
}
Thread.sleep(Long.parseLong(configReader.getConfiguration(POLLING_FREQUENCEY)));
- jobById = cluster.getJobById(jobID);
+ jobById = this.getJobById(jobID);
}
//Set the job status to Complete
- jobSubmissionListener.setJobStatus(JobStatus.C);
- jobSubmissionListener.statusChanged(jobById);
- Thread.sleep(Long.parseLong(configReader.getConfiguration(POLLING_FREQUENCEY)));
+ listener.setJobStatus(JobStatus.C);
+ listener.statusChanged(jobById);
+ break;
}
- } catch (InterruptedException e) {
- log.error("Error listening to the submitted job", e);
- } catch (SSHApiException e) {
- log.error("Error listening to the submitted job", e);
- }
- }
- };
+// } catch (InterruptedException e) {
+// log.error("Error listening to the submitted job", e);
+// } catch (SSHApiException e) {
+// log.error("Error listening to the submitted job", e);
+// }
+// }
+// };
// This thread runs until the program termination, so that use can provide
// any action in onChange method of the listener, without worrying for waiting in the caller thread.
- t.setDaemon(true);
- t.start();
+ //t.setDaemon(true);
+// t.start();
} catch (Exception e) {
log.error("Error during job status monitoring");
throw new SSHApiException("Error during job status monitoring", e);
@@ -432,6 +441,25 @@ public class DefaultCluster implements C
return jobID; //To change body of implemented methods use File | Settings | File Templates.
}
+ public Job cancelJob(String jobID) throws SSHApiException {
+ RawCommandInfo rawCommandInfo = new RawCommandInfo("/opt/torque/bin/qdel " + jobID);
+
+ StandardOutReader stdOutReader = new StandardOutReader();
+ CommandExecutor.executeCommand(rawCommandInfo, this.getSession(), stdOutReader);
+ if (!stdOutReader.getErrorifAvailable().equals("")) {
+ throw new SSHApiException(stdOutReader.getStandardError().toString());
+ }
+
+ Job jobById = this.getJobById(jobID);
+ if(jobById.getStatus().equals(JobStatus.C.toString())) {
+ log.info("Job Cancel operation was successful !");
+ return jobById;
+ }else {
+ log.info("Job Cancel operation was not successful !");
+ return null;
+ }
+ }
+
public void setServerInfo(ServerInfo serverInfo) {
this.serverInfo = serverInfo;
}
Modified: airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultJobSubmissionListener.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultJobSubmissionListener.java?rev=1523779&r1=1523778&r2=1523779&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultJobSubmissionListener.java (original)
+++ airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultJobSubmissionListener.java Mon Sep 16 19:54:42 2013
@@ -27,7 +27,7 @@ import org.apache.airavata.gsi.ssh.liste
public class DefaultJobSubmissionListener extends JobSubmissionListener {
public void statusChanged(Job jobDescriptor) throws SSHApiException {
- System.out.println("Job status has changed : " + jobDescriptor.getStatus());
+ System.out.println("Job status has changed to : " + jobDescriptor.getStatus());
}
@Override
Modified: airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/JobSubmissionListener.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/JobSubmissionListener.java?rev=1523779&r1=1523778&r2=1523779&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/JobSubmissionListener.java (original)
+++ airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/JobSubmissionListener.java Mon Sep 16 19:54:42 2013
@@ -56,7 +56,6 @@ public abstract class JobSubmissionListe
public void waitFor() throws SSHApiException{
while (!isJobDone()) {
synchronized (this) {
-
try {
wait();
} catch (InterruptedException e) {}
Added: airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/CommonUtils.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/CommonUtils.java?rev=1523779&view=auto
==============================================================================
--- airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/CommonUtils.java (added)
+++ airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/CommonUtils.java Mon Sep 16 19:54:42 2013
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gsi.ssh.util;
+
+import org.apache.airavata.gsi.ssh.api.job.Job;
+import org.apache.airavata.gsi.ssh.impl.JobStatus;
+
+public class CommonUtils {
+ /**
+ * This returns true if the give job is finished
+ * otherwise false
+ * @param job
+ * @return
+ */
+ public static boolean isJobFinished(Job job) {
+ if (JobStatus.C.toString().equals(job.getStatus())) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+}
Modified: airavata/sandbox/gsissh/src/main/resources/PBSTemplate.xslt
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/resources/PBSTemplate.xslt?rev=1523779&r1=1523778&r2=1523779&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/resources/PBSTemplate.xslt (original)
+++ airavata/sandbox/gsissh/src/main/resources/PBSTemplate.xslt Mon Sep 16 19:54:42 2013
@@ -12,6 +12,7 @@
# PBS batch job script built by Globus job manager
#
##PBS -S <xsl:value-of select="ns:shellName"/>
+#PBS -q <xsl:value-of select="ns:queueName"/>
#PBS -m <xsl:value-of select="ns:mailOptions"/>
#PBS -A <xsl:value-of select="ns:acountString"/>
#PBS -l walltime=<xsl:value-of select="ns:maxWallTime"/>
Modified: airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/config/SCPToTest.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/config/SCPToTest.java?rev=1523779&r1=1523778&r2=1523779&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/config/SCPToTest.java (original)
+++ airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/config/SCPToTest.java Mon Sep 16 19:54:42 2013
@@ -38,6 +38,9 @@ public class SCPToTest {
@BeforeTest
public void setUp() throws Exception {
+// System.setProperty("myproxy.user", "ogce");
+// System.setProperty("myproxy.password", "");
+// System.setProperty("basedir", "/Users/lahirugunathilake/work/airavata/sandbox/gsissh");
myProxyUserName = System.getProperty("myproxy.user");
myProxyPassword = System.getProperty("myproxy.password");
Modified: airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApiTest.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApiTest.java?rev=1523779&r1=1523778&r2=1523779&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApiTest.java (original)
+++ airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApiTest.java Mon Sep 16 19:54:42 2013
@@ -21,9 +21,11 @@
package org.apache.airavata.gsi.ssh.impl;
+import junit.framework.Assert;
import org.apache.airavata.gsi.ssh.api.*;
import org.apache.airavata.gsi.ssh.api.job.Job;
import org.apache.airavata.gsi.ssh.config.ConfigReader;
+import org.apache.airavata.gsi.ssh.util.CommonUtils;
import org.testng.AssertJUnit;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
@@ -51,9 +53,9 @@ public class DefaultSSHApiTest {
@BeforeTest
public void setUp() throws Exception {
- System.setProperty("myproxy.user", "ogce");
- System.setProperty("myproxy.password", "");
- System.setProperty("basedir", "/Users/lahirugunathilake/work/airavata/sandbox/gsissh");
+// System.setProperty("myproxy.user", "ogce");
+// System.setProperty("myproxy.password", "");
+// System.setProperty("basedir", "/Users/lahirugunathilake/work/airavata/sandbox/gsissh");
myProxyUserName = System.getProperty("myproxy.user");
myProxyPassword = System.getProperty("myproxy.password");
@@ -143,6 +145,7 @@ public class DefaultSSHApiTest {
System.out.println("Target PBS file path: " + workingDirectory);
System.out.println("Local PBS File path: " + pbsFilePath);
String workingDirectory = File.separator + "home" + File.separator + "ogce" + File.separator + "gsissh";
+ // constructing the job object
Job jobDescriptor = new Job();
jobDescriptor.setWorkingDirectory(workingDirectory);
jobDescriptor.setShellName("/bin/bash");
@@ -154,11 +157,13 @@ public class DefaultSSHApiTest {
jobDescriptor.setStandardErrorFile(workingDirectory + File.separator + "application.err");
jobDescriptor.setNodes(1);
jobDescriptor.setProcessesPerNode(1);
+ jobDescriptor.setQueueName("normal");
jobDescriptor.setMaxWallTime("1:00:00");
jobDescriptor.setAcountString("sds128");
List<String> inputs = new ArrayList<String>();
inputs.add("Hello World");
jobDescriptor.setInputValues(inputs);
+ //finished construction of job object
System.out.println(jobDescriptor.toXML());
jobID = trestles.submitAsyncJob(jobDescriptor);
System.out.println("JobID returned : " + jobID);
@@ -166,6 +171,8 @@ public class DefaultSSHApiTest {
// Cluster cluster = sshApi.getCluster(serverInfo, authenticationInfo);
Thread.sleep(1000);
Job jobById = trestles.getJobById(jobID);
+
+ //printing job data got from previous call
AssertJUnit.assertEquals(jobById.getJobId(), jobID);
System.out.println(jobById.getAcountString());
System.out.println(jobById.getAllEnvExport());
@@ -228,13 +235,14 @@ public class DefaultSSHApiTest {
jobDescriptor.setWorkingDirectory(workingDirectory);
jobDescriptor.setShellName("/bin/bash");
jobDescriptor.setJobName("GSI_SSH_SLEEP_JOB");
- jobDescriptor.setExecutablePath("/bin/echo");
+ jobDescriptor.setExecutablePath("/bin/sleep");
jobDescriptor.setAllEnvExport(true);
jobDescriptor.setMailOptions("n");
jobDescriptor.setStandardOutFile(workingDirectory + File.separator + "application.out");
jobDescriptor.setStandardErrorFile(workingDirectory + File.separator + "application.err");
jobDescriptor.setNodes(1);
jobDescriptor.setProcessesPerNode(100);
+ jobDescriptor.setQueueName("normal");
jobDescriptor.setMaxWallTime("1:00:00");
jobDescriptor.setAcountString("sds128");
List<String> inputs = new ArrayList<String>();
@@ -271,7 +279,7 @@ public class DefaultSSHApiTest {
jobDescriptor.setWorkingDirectory(workingDirectory);
jobDescriptor.setShellName("/bin/bash");
jobDescriptor.setJobName("GSI_SSH_SLEEP_JOB");
- jobDescriptor.setExecutablePath("/bin/echo");
+ jobDescriptor.setExecutablePath("/bin/sleep");
jobDescriptor.setAllEnvExport(true);
jobDescriptor.setMailOptions("n");
jobDescriptor.setStandardOutFile(workingDirectory + File.separator + "application.out");
@@ -279,9 +287,10 @@ public class DefaultSSHApiTest {
jobDescriptor.setNodes(1);
jobDescriptor.setProcessesPerNode(1);
jobDescriptor.setMaxWallTime("1:00:00");
+ jobDescriptor.setQueueName("normal");
jobDescriptor.setAcountString("sds128");
List<String> inputs = new ArrayList<String>();
- inputs.add("Hello World !!");
+ inputs.add("1000");
jobDescriptor.setInputValues(inputs);
System.out.println(jobDescriptor.toXML());
DefaultJobSubmissionListener listener = new DefaultJobSubmissionListener();
@@ -291,4 +300,54 @@ public class DefaultSSHApiTest {
}
}
+ @Test
+ public void testJobCancel() throws Exception {
+ // Create authentication
+ AuthenticationInfo authenticationInfo
+ = new MyProxyAuthenticationInfo(myProxyUserName, myProxyPassword, "myproxy.teragrid.org",
+ 7512, 17280000,certificateLocation);
+
+ // Server info
+ ServerInfo serverInfo = new ServerInfo("ogce", "trestles.sdsc.edu");
+
+
+ Cluster trestles = new DefaultCluster(serverInfo, authenticationInfo);
+
+
+ // Execute command
+ System.out.println("Target PBS file path: " + workingDirectory);
+ System.out.println("Local PBS File path: " + pbsFilePath);
+ String workingDirectory = File.separator + "home" + File.separator + "ogce" + File.separator + "gsissh";
+ Job jobDescriptor = new Job();
+ jobDescriptor.setWorkingDirectory(workingDirectory);
+ jobDescriptor.setShellName("/bin/bash");
+ jobDescriptor.setJobName("GSI_SSH_SLEEP_JOB");
+ jobDescriptor.setExecutablePath("/bin/sleep");
+ jobDescriptor.setAllEnvExport(true);
+ jobDescriptor.setMailOptions("n");
+ jobDescriptor.setStandardOutFile(workingDirectory + File.separator + "application.out");
+ jobDescriptor.setStandardErrorFile(workingDirectory + File.separator + "application.err");
+ jobDescriptor.setNodes(1);
+ jobDescriptor.setProcessesPerNode(1);
+ jobDescriptor.setMaxWallTime("1:00:00");
+ jobDescriptor.setQueueName("normal");
+ jobDescriptor.setAcountString("sds128");
+ List<String> inputs = new ArrayList<String>();
+ inputs.add("1000");
+ jobDescriptor.setInputValues(inputs);
+ System.out.println(jobDescriptor.toXML());
+ String jobID = trestles.submitAsyncJob(jobDescriptor);
+ System.out.println("Job submitted to successfully : " + jobID);
+ Job jobById = trestles.getJobById(jobID);
+ if(!CommonUtils.isJobFinished(jobById)) {
+ Job job = trestles.cancelJob(jobID);
+ if (CommonUtils.isJobFinished(job)) {
+ Assert.assertTrue(true);
+ }else{
+ Assert.assertTrue(true);
+ }
+ }
+
+ }
+
}