You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2013/09/09 18:36:23 UTC

svn commit: r1521184 - in /airavata/sandbox/gsissh/src: main/java/org/apache/airavata/gsi/ssh/impl/ main/java/org/apache/airavata/gsi/ssh/listener/ main/java/org/apache/airavata/gsi/ssh/util/ test/java/org/apache/airavata/gsi/ssh/config/ test/java/org/...

Author: lahiru
Date: Mon Sep  9 16:36:23 2013
New Revision: 1521184

URL: http://svn.apache.org/r1521184
Log:
changes to some of the implementaiton.

Added:
    airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SSHUtils.java
      - copied, changed from r1520626, airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SCPUtils.java
Removed:
    airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SCPUtils.java
Modified:
    airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultCluster.java
    airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultJobSubmissionListener.java
    airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/JobSubmissionListener.java
    airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/config/SCPToTest.java
    airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApiTest.java

Modified: airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultCluster.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultCluster.java?rev=1521184&r1=1521183&r2=1521184&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultCluster.java (original)
+++ airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultCluster.java Mon Sep  9 16:36:23 2013
@@ -20,13 +20,14 @@
 */
 package org.apache.airavata.gsi.ssh.impl;
 
+import bsh.This;
 import com.jcraft.jsch.*;
 import org.apache.airavata.gsi.ssh.api.*;
 import org.apache.airavata.gsi.ssh.api.job.Job;
 import org.apache.airavata.gsi.ssh.config.ConfigReader;
 import org.apache.airavata.gsi.ssh.jsch.ExtendedJSch;
 import org.apache.airavata.gsi.ssh.listener.JobSubmissionListener;
-import org.apache.airavata.gsi.ssh.util.SCPUtils;
+import org.apache.airavata.gsi.ssh.util.SSHUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.FilenameUtils;
 import org.slf4j.Logger;
@@ -116,7 +117,6 @@ public class DefaultCluster implements C
     }
 
 
-
     public String submitAsyncJobWithPBS(String pbsFilePath, String workingDirectory) throws SSHApiException {
 
         this.scpTo(workingDirectory, pbsFilePath);
@@ -250,7 +250,7 @@ public class DefaultCluster implements C
         RawCommandInfo rawCommandInfo = new RawCommandInfo("/opt/torque/bin/qstat -f " + jobID);
 
         StandardOutReader stdOutReader = new StandardOutReader();
-        CommandExecutor.executeCommand(rawCommandInfo,this.getSession(), stdOutReader);
+        CommandExecutor.executeCommand(rawCommandInfo, this.getSession(), stdOutReader);
         if (!stdOutReader.getErrorifAvailable().equals("")) {
             throw new SSHApiException(stdOutReader.getStandardError().toString());
         }
@@ -345,13 +345,13 @@ public class DefaultCluster implements C
 
     public void scpTo(String rFile, String lFile) throws SSHApiException {
         try {
-            SCPUtils.scpTo(rFile, lFile, session);
+            SSHUtils.scpTo(rFile, lFile, session);
         } catch (IOException e) {
             new SSHApiException("Faile during scping local file:" + lFile + " to remote file "
-                    + serverInfo.getHost() + ":rFile" , e);
+                    + serverInfo.getHost() + ":rFile", e);
         } catch (JSchException e) {
             new SSHApiException("Faile during scping local file:" + lFile + " to remote file "
-                    + serverInfo.getHost() + ":rFile" , e);
+                    + serverInfo.getHost() + ":rFile", e);
         }
     }
 
@@ -380,31 +380,56 @@ public class DefaultCluster implements C
     }
 
     public String submitAsyncJob(Job jobDescriptor, JobSubmissionListener listener) throws SSHApiException {
-        String jobID = this.submitAsyncJob(jobDescriptor);
+        final Cluster cluster = this;
+        final String jobID = this.submitAsyncJob(jobDescriptor);
+        final JobSubmissionListener jobSubmissionListener = listener;
         try {
-            Thread.sleep(Long.parseLong(configReader.getConfiguration(POLLING_FREQUENCEY)));
+            // Wait 5 seconds to start the first poll, this is hard coded, user doesn't have
+            // to configure this.
+            Thread.sleep(5000);
         } catch (InterruptedException e) {
             log.error("Error during job status monitoring");
             throw new SSHApiException("Error during job status monitoring", e);
         }
         // Get the job status first
-        Job jobById = this.getJobById(jobID);
+        try {
 
-        while (!jobById.getStatus().equals(JobStatus.C.toString())) {
-            if (!jobById.getStatus().equals(listener.getJobStatus().toString())) {
-                listener.setJobStatus(JobStatus.fromString(jobById.getStatus()));
-                listener.statusChanged(jobById);
-            }
-            try {
-                Thread.sleep(Long.parseLong(configReader.getConfiguration(POLLING_FREQUENCEY)));
-            } catch (InterruptedException e) {
-                log.error("Error during job status monitoring");
-                throw new SSHApiException("Error during job status monitoring", e);
-            }
-            jobById = this.getJobById(jobID);
+            Thread t = new Thread() {
+                @Override
+                public void run() {
+                    try {
+                        Job jobById = cluster.getJobById(jobID);
+                        while (true) {
+                            while (!jobById.getStatus().equals(JobStatus.C.toString())) {
+                                if (!jobById.getStatus().equals(jobSubmissionListener.getJobStatus().toString())) {
+                                    jobSubmissionListener.setJobStatus(JobStatus.fromString(jobById.getStatus()));
+                                    jobSubmissionListener.statusChanged(jobById);
+                                }
+                                Thread.sleep(Long.parseLong(configReader.getConfiguration(POLLING_FREQUENCEY)));
+
+                                jobById = cluster.getJobById(jobID);
+                            }
+                            //Set the job status to Complete
+                            jobSubmissionListener.setJobStatus(JobStatus.C);
+                            jobSubmissionListener.statusChanged(jobById);
+                            Thread.sleep(Long.parseLong(configReader.getConfiguration(POLLING_FREQUENCEY)));
+                        }
+                    } catch (InterruptedException e) {
+                        log.error("Error listening to the submitted job", e);
+                    } catch (SSHApiException e) {
+                        log.error("Error listening to the submitted job", e);
+                    }
+                }
+            };
+            //  This thread runs until the program termination, so that use can provide
+            // any action in onChange method of the listener, without worrying for waiting in the caller thread.
+            t.setDaemon(true);
+            t.start();
+        } catch (Exception e) {
+            log.error("Error during job status monitoring");
+            throw new SSHApiException("Error during job status monitoring", e);
         }
-        listener.statusChanged(jobById);
-        return listener.getJobStatus().toString();  //To change body of implemented methods use File | Settings | File Templates.
+        return jobID;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
     public void setServerInfo(ServerInfo serverInfo) {
@@ -419,7 +444,7 @@ public class DefaultCluster implements C
         this.session = session;
     }
 
-     /**
+    /**
      * @return cluster Nodes as array of machines
      */
     public Machine[] getNodes() {

Modified: airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultJobSubmissionListener.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultJobSubmissionListener.java?rev=1521184&r1=1521183&r2=1521184&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultJobSubmissionListener.java (original)
+++ airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/DefaultJobSubmissionListener.java Mon Sep  9 16:36:23 2013
@@ -29,4 +29,12 @@ public class DefaultJobSubmissionListene
     public void statusChanged(Job jobDescriptor) throws SSHApiException {
         System.out.println("Job status has changed : " + jobDescriptor.getStatus());
     }
+
+    @Override
+    public boolean isJobDone() throws SSHApiException {
+        if(getJobStatus().equals(JobStatus.C)){
+            return true;
+        }
+        return false;
+    }
 }

Modified: airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/JobSubmissionListener.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/JobSubmissionListener.java?rev=1521184&r1=1521183&r2=1521184&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/JobSubmissionListener.java (original)
+++ airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/listener/JobSubmissionListener.java Mon Sep  9 16:36:23 2013
@@ -49,4 +49,26 @@ public abstract class JobSubmissionListe
     public void setJobStatus(JobStatus jobStatus) {
         this.jobStatus = jobStatus;
     }
+
+    /**
+     * This method is used to block the process until the currentStatus of the job is DONE or FAILED
+     */
+    public void waitFor()  throws SSHApiException{
+        while (!isJobDone()) {
+            synchronized (this) {
+
+                try {
+                    wait();
+                } catch (InterruptedException e) {}
+            }
+        }
+    }
+
+    /**
+     * BAsed on the implementation user can define how to decide the job done
+     * scenario
+     * @return
+     * @throws SSHApiException
+     */
+    public abstract boolean isJobDone() throws SSHApiException;
 }

Copied: airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SSHUtils.java (from r1520626, airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SCPUtils.java)
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SSHUtils.java?p2=airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SSHUtils.java&p1=airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SCPUtils.java&r1=1520626&r2=1521184&rev=1521184&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SCPUtils.java (original)
+++ airavata/sandbox/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SSHUtils.java Mon Sep  9 16:36:23 2013
@@ -33,8 +33,8 @@ import java.io.*;
 /**
  * This class is going to be useful to SCP a file to a remote grid machine using my proxy credentials
  */
-public class SCPUtils {
-    private static final org.slf4j.Logger log = LoggerFactory.getLogger(SCPUtils.class);
+public class SSHUtils {
+    private static final org.slf4j.Logger log = LoggerFactory.getLogger(SSHUtils.class);
 
     static {
         JSch.setConfig("gssapi-with-mic.x509", "org.apache.airavata.gsi.ssh.GSSContextX509");
@@ -55,7 +55,7 @@ public class SCPUtils {
      * @param certificateLocation
      * @param configReader
      */
-    public SCPUtils(ServerInfo serverInfo, AuthenticationInfo authenticationInfo, String certificateLocation, ConfigReader configReader) {
+    public SSHUtils(ServerInfo serverInfo, AuthenticationInfo authenticationInfo, String certificateLocation, ConfigReader configReader) {
         System.setProperty("X509_CERT_DIR", certificateLocation);
         this.serverInfo = serverInfo;
         this.authenticationInfo = authenticationInfo;
@@ -68,7 +68,7 @@ public class SCPUtils {
      * @param authenticationInfo
      * @param configReader
      */
-    public SCPUtils(ServerInfo serverInfo, AuthenticationInfo authenticationInfo
+    public SSHUtils(ServerInfo serverInfo, AuthenticationInfo authenticationInfo
             , ConfigReader configReader) {
         this.serverInfo = serverInfo;
         this.authenticationInfo = authenticationInfo;

Modified: airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/config/SCPToTest.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/config/SCPToTest.java?rev=1521184&r1=1521183&r2=1521184&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/config/SCPToTest.java (original)
+++ airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/config/SCPToTest.java Mon Sep  9 16:36:23 2013
@@ -21,7 +21,7 @@
 package org.apache.airavata.gsi.ssh.config;
 
 import org.apache.airavata.gsi.ssh.api.AuthenticationInfo;
-import org.apache.airavata.gsi.ssh.util.SCPUtils;
+import org.apache.airavata.gsi.ssh.util.SSHUtils;
 import org.apache.airavata.gsi.ssh.api.ServerInfo;
 import org.apache.airavata.gsi.ssh.impl.MyProxyAuthenticationInfo;
 import org.testng.annotations.BeforeTest;
@@ -66,7 +66,7 @@ public class SCPToTest {
                 = new MyProxyAuthenticationInfo(myProxyUserName, myProxyPassword, "myproxy.teragrid.org",
                 7512, 17280000,certificateLocation);
         ServerInfo serverInfo = new ServerInfo("ogce" ,"trestles.sdsc.edu");
-        SCPUtils scpTo = new SCPUtils(serverInfo,authenticationInfo,this.certificateLocation,new ConfigReader());
+        SSHUtils scpTo = new SSHUtils(serverInfo,authenticationInfo,this.certificateLocation,new ConfigReader());
         scpTo.scpTo(rFilePath, lFilePath);
     }
 }

Modified: airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApiTest.java
URL: http://svn.apache.org/viewvc/airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApiTest.java?rev=1521184&r1=1521183&r2=1521184&view=diff
==============================================================================
--- airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApiTest.java (original)
+++ airavata/sandbox/gsissh/src/test/java/org/apache/airavata/gsi/ssh/impl/DefaultSSHApiTest.java Mon Sep  9 16:36:23 2013
@@ -52,7 +52,7 @@ public class DefaultSSHApiTest {
     @BeforeTest
     public void setUp() throws Exception {
         System.setProperty("myproxy.user", "ogce");
-        System.setProperty("myproxy.password", "Jdas7wph");
+        System.setProperty("myproxy.password", "");
         System.setProperty("basedir", "/Users/lahirugunathilake/work/airavata/sandbox/gsissh");
         myProxyUserName = System.getProperty("myproxy.user");
         myProxyPassword = System.getProperty("myproxy.password");
@@ -284,8 +284,11 @@ public class DefaultSSHApiTest {
         inputs.add("Hello World !!");
         jobDescriptor.setInputValues(inputs);
         System.out.println(jobDescriptor.toXML());
-        String status = trestles.submitAsyncJob(jobDescriptor, new DefaultJobSubmissionListener());
-        System.out.println("Job has finished with the status : " + status);
+        DefaultJobSubmissionListener listener = new DefaultJobSubmissionListener();
+        String status = trestles.submitAsyncJob(jobDescriptor, listener);
+        while(!listener.isJobDone()){
+           Thread.sleep(10000);
+        }
     }
 
 }