You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2013/09/09 18:36:23 UTC
svn commit: r1521184 - in /airavata/sandbox/gsissh/src:
main/java/org/apache/airavata/gsi/ssh/impl/
main/java/org/apache/airavata/gsi/ssh/listener/
main/java/org/apache/airavata/gsi/ssh/util/
test/java/org/apache/airavata/gsi/ssh/config/ test/java/org/...
Author: lahiru
Date: Mon Sep 9 16:36:23 2013
New Revision: 1521184
URL: http://svn.apache.org/r1521184
Log:
changes to some of the implementaiton.
Added:
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SSHUtils.java
- copied, changed from r1520626, airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SCPUtils.java
Removed:
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SCPUtils.java
Modified:
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultCluster.java
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultJobSubmissionListener.java
airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/JobSubmissionListener.java
airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/config/SCPToTest.java
airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApiTest.java
Modified: airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultCluster.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultCluster.java?rev=1521184&r1=1521183&r2=1521184&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultCluster.java (original)
+++ airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultCluster.java Mon Sep 9 16:36:23 2013
@@ -20,13 +20,14 @@
*/
package org.apache.airavata.gsi.ssh.impl;
+import bsh.This;
import com.jcraft.jsch.*;
import org.apache.airavata.gsi.ssh.api.*;
import org.apache.airavata.gsi.ssh.api.job.Job;
import org.apache.airavata.gsi.ssh.config.ConfigReader;
import org.apache.airavata.gsi.ssh.jsch.ExtendedJSch;
import org.apache.airavata.gsi.ssh.listener.JobSubmissionListener;
-import org.apache.airavata.gsi.ssh.util.SCPUtils;
+import org.apache.airavata.gsi.ssh.util.SSHUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.slf4j.Logger;
@@ -116,7 +117,6 @@ public class DefaultCluster implements C
}
-
public String submitAsyncJobWithPBS(String pbsFilePath, String workingDirectory) throws SSHApiException {
this.scpTo(workingDirectory, pbsFilePath);
@@ -250,7 +250,7 @@ public class DefaultCluster implements C
RawCommandInfo rawCommandInfo = new RawCommandInfo("/opt/torque/bin/qstat -f " + jobID);
StandardOutReader stdOutReader = new StandardOutReader();
- CommandExecutor.executeCommand(rawCommandInfo,this.getSession(), stdOutReader);
+ CommandExecutor.executeCommand(rawCommandInfo, this.getSession(), stdOutReader);
if (!stdOutReader.getErrorifAvailable().equals("")) {
throw new SSHApiException(stdOutReader.getStandardError().toString());
}
@@ -345,13 +345,13 @@ public class DefaultCluster implements C
public void scpTo(String rFile, String lFile) throws SSHApiException {
try {
- SCPUtils.scpTo(rFile, lFile, session);
+ SSHUtils.scpTo(rFile, lFile, session);
} catch (IOException e) {
new SSHApiException("Faile during scping local file:" + lFile + " to remote file "
- + serverInfo.getHost() + ":rFile" , e);
+ + serverInfo.getHost() + ":rFile", e);
} catch (JSchException e) {
new SSHApiException("Faile during scping local file:" + lFile + " to remote file "
- + serverInfo.getHost() + ":rFile" , e);
+ + serverInfo.getHost() + ":rFile", e);
}
}
@@ -380,31 +380,56 @@ public class DefaultCluster implements C
}
public String submitAsyncJob(Job jobDescriptor, JobSubmissionListener listener) throws SSHApiException {
- String jobID = this.submitAsyncJob(jobDescriptor);
+ final Cluster cluster = this;
+ final String jobID = this.submitAsyncJob(jobDescriptor);
+ final JobSubmissionListener jobSubmissionListener = listener;
try {
- Thread.sleep(Long.parseLong(configReader.getConfiguration(POLLING_FREQUENCEY)));
+ // Wait 5 seconds to start the first poll, this is hard coded, user doesn't have
+ // to configure this.
+ Thread.sleep(5000);
} catch (InterruptedException e) {
log.error("Error during job status monitoring");
throw new SSHApiException("Error during job status monitoring", e);
}
// Get the job status first
- Job jobById = this.getJobById(jobID);
+ try {
- while (!jobById.getStatus().equals(JobStatus.C.toString())) {
- if (!jobById.getStatus().equals(listener.getJobStatus().toString())) {
- listener.setJobStatus(JobStatus.fromString(jobById.getStatus()));
- listener.statusChanged(jobById);
- }
- try {
- Thread.sleep(Long.parseLong(configReader.getConfiguration(POLLING_FREQUENCEY)));
- } catch (InterruptedException e) {
- log.error("Error during job status monitoring");
- throw new SSHApiException("Error during job status monitoring", e);
- }
- jobById = this.getJobById(jobID);
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ Job jobById = cluster.getJobById(jobID);
+ while (true) {
+ while (!jobById.getStatus().equals(JobStatus.C.toString())) {
+ if (!jobById.getStatus().equals(jobSubmissionListener.getJobStatus().toString())) {
+ jobSubmissionListener.setJobStatus(JobStatus.fromString(jobById.getStatus()));
+ jobSubmissionListener.statusChanged(jobById);
+ }
+ Thread.sleep(Long.parseLong(configReader.getConfiguration(POLLING_FREQUENCEY)));
+
+ jobById = cluster.getJobById(jobID);
+ }
+ //Set the job status to Complete
+ jobSubmissionListener.setJobStatus(JobStatus.C);
+ jobSubmissionListener.statusChanged(jobById);
+ Thread.sleep(Long.parseLong(configReader.getConfiguration(POLLING_FREQUENCEY)));
+ }
+ } catch (InterruptedException e) {
+ log.error("Error listening to the submitted job", e);
+ } catch (SSHApiException e) {
+ log.error("Error listening to the submitted job", e);
+ }
+ }
+ };
+ // This thread runs until the program termination, so that use can provide
+ // any action in onChange method of the listener, without worrying for waiting in the caller thread.
+ t.setDaemon(true);
+ t.start();
+ } catch (Exception e) {
+ log.error("Error during job status monitoring");
+ throw new SSHApiException("Error during job status monitoring", e);
}
- listener.statusChanged(jobById);
- return listener.getJobStatus().toString(); //To change body of implemented methods use File | Settings | File Templates.
+ return jobID; //To change body of implemented methods use File | Settings | File Templates.
}
public void setServerInfo(ServerInfo serverInfo) {
@@ -419,7 +444,7 @@ public class DefaultCluster implements C
this.session = session;
}
- /**
+ /**
* @return cluster Nodes as array of machines
*/
public Machine[] getNodes() {
Modified: airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultJobSubmissionListener.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultJobSubmissionListener.java?rev=1521184&r1=1521183&r2=1521184&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultJobSubmissionListener.java (original)
+++ airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultJobSubmissionListener.java Mon Sep 9 16:36:23 2013
@@ -29,4 +29,12 @@ public class DefaultJobSubmissionListene
public void statusChanged(Job jobDescriptor) throws SSHApiException {
System.out.println("Job status has changed : " + jobDescriptor.getStatus());
}
+
+ @Override
+ public boolean isJobDone() throws SSHApiException {
+ if(getJobStatus().equals(JobStatus.C)){
+ return true;
+ }
+ return false;
+ }
}
Modified: airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/JobSubmissionListener.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/JobSubmissionListener.java?rev=1521184&r1=1521183&r2=1521184&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/JobSubmissionListener.java (original)
+++ airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/JobSubmissionListener.java Mon Sep 9 16:36:23 2013
@@ -49,4 +49,26 @@ public abstract class JobSubmissionListe
public void setJobStatus(JobStatus jobStatus) {
this.jobStatus = jobStatus;
}
+
+ /**
+ * This method is used to block the process until the currentStatus of the job is DONE or FAILED
+ */
+ public void waitFor() throws SSHApiException{
+ while (!isJobDone()) {
+ synchronized (this) {
+
+ try {
+ wait();
+ } catch (InterruptedException e) {}
+ }
+ }
+ }
+
+ /**
+ * BAsed on the implementation user can define how to decide the job done
+ * scenario
+ * @return
+ * @throws SSHApiException
+ */
+ public abstract boolean isJobDone() throws SSHApiException;
}
Copied: airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SSHUtils.java (from r1520626, airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SCPUtils.java)
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SSHUtils.java?p2=airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SSHUtils.java&p1=airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SCPUtils.java&r1=1520626&r2=1521184&rev=1521184&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SCPUtils.java (original)
+++ airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SSHUtils.java Mon Sep 9 16:36:23 2013
@@ -33,8 +33,8 @@ import java.io.*;
/**
* This class is going to be useful to SCP a file to a remote grid machine using my proxy credentials
*/
-public class SCPUtils {
- private static final org.slf4j.Logger log = LoggerFactory.getLogger(SCPUtils.class);
+public class SSHUtils {
+ private static final org.slf4j.Logger log = LoggerFactory.getLogger(SSHUtils.class);
static {
JSch.setConfig("gssapi-with-mic.x509", "org.apache.airavata.gsi.ssh.GSSContextX509");
@@ -55,7 +55,7 @@ public class SCPUtils {
* @param certificateLocation
* @param configReader
*/
- public SCPUtils(ServerInfo serverInfo, AuthenticationInfo authenticationInfo, String certificateLocation, ConfigReader configReader) {
+ public SSHUtils(ServerInfo serverInfo, AuthenticationInfo authenticationInfo, String certificateLocation, ConfigReader configReader) {
System.setProperty("X509_CERT_DIR", certificateLocation);
this.serverInfo = serverInfo;
this.authenticationInfo = authenticationInfo;
@@ -68,7 +68,7 @@ public class SCPUtils {
* @param authenticationInfo
* @param configReader
*/
- public SCPUtils(ServerInfo serverInfo, AuthenticationInfo authenticationInfo
+ public SSHUtils(ServerInfo serverInfo, AuthenticationInfo authenticationInfo
, ConfigReader configReader) {
this.serverInfo = serverInfo;
this.authenticationInfo = authenticationInfo;
Modified: airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/config/SCPToTest.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/config/SCPToTest.java?rev=1521184&r1=1521183&r2=1521184&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/config/SCPToTest.java (original)
+++ airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/config/SCPToTest.java Mon Sep 9 16:36:23 2013
@@ -21,7 +21,7 @@
package org.apache.airavata.gsi.ssh.config;
import org.apache.airavata.gsi.ssh.api.AuthenticationInfo;
-import org.apache.airavata.gsi.ssh.util.SCPUtils;
+import org.apache.airavata.gsi.ssh.util.SSHUtils;
import org.apache.airavata.gsi.ssh.api.ServerInfo;
import org.apache.airavata.gsi.ssh.impl.MyProxyAuthenticationInfo;
import org.testng.annotations.BeforeTest;
@@ -66,7 +66,7 @@ public class SCPToTest {
= new MyProxyAuthenticationInfo(myProxyUserName, myProxyPassword, "myproxy.teragrid.org",
7512, 17280000,certificateLocation);
ServerInfo serverInfo = new ServerInfo("ogce" ,"trestles.sdsc.edu");
- SCPUtils scpTo = new SCPUtils(serverInfo,authenticationInfo,this.certificateLocation,new ConfigReader());
+ SSHUtils scpTo = new SSHUtils(serverInfo,authenticationInfo,this.certificateLocation,new ConfigReader());
scpTo.scpTo(rFilePath, lFilePath);
}
}
Modified: airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApiTest.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApiTest.java?rev=1521184&r1=1521183&r2=1521184&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApiTest.java (original)
+++ airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApiTest.java Mon Sep 9 16:36:23 2013
@@ -52,7 +52,7 @@ public class DefaultSSHApiTest {
@BeforeTest
public void setUp() throws Exception {
System.setProperty("myproxy.user", "ogce");
- System.setProperty("myproxy.password", "Jdas7wph");
+ System.setProperty("myproxy.password", "");
System.setProperty("basedir", "/Users/lahirugunathilake/work/airavata/sandbox/gsissh");
myProxyUserName = System.getProperty("myproxy.user");
myProxyPassword = System.getProperty("myproxy.password");
@@ -284,8 +284,11 @@ public class DefaultSSHApiTest {
inputs.add("Hello World !!");
jobDescriptor.setInputValues(inputs);
System.out.println(jobDescriptor.toXML());
- String status = trestles.submitAsyncJob(jobDescriptor, new DefaultJobSubmissionListener());
- System.out.println("Job has finished with the status : " + status);
+ DefaultJobSubmissionListener listener = new DefaultJobSubmissionListener();
+ String status = trestles.submitAsyncJob(jobDescriptor, listener);
+ while(!listener.isJobDone()){
+ Thread.sleep(10000);
+ }
}
}