You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ra...@apache.org on 2014/03/18 21:17:47 UTC

git commit: stated using Jcraft for SSHprovider communication. AIRAVATA-1082

Repository: airavata
Updated Branches:
  refs/heads/master e58a4427a -> 31f3883c2


stated using Jcraft for SSHprovider communication. AIRAVATA-1082


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/31f3883c
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/31f3883c
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/31f3883c

Branch: refs/heads/master
Commit: 31f3883c23f0ae6eb92e306eddfbfa214bfb41c4
Parents: e58a442
Author: raminder <ra...@apache.org>
Authored: Tue Mar 18 16:17:36 2014 -0400
Committer: raminder <ra...@apache.org>
Committed: Tue Mar 18 16:17:36 2014 -0400

----------------------------------------------------------------------
 .../client/samples/CreateLaunchExperiment.java  |  2 +-
 .../samples/CreateLaunchExperimentUS3.java      |  4 +-
 .../main/resources/airavata-client.properties   | 10 +--
 modules/gfac/gfac-core/pom.xml                  |  8 +-
 .../gfac/provider/impl/SSHProvider.java         | 82 +++++++++++---------
 .../apache/airavata/gsi/ssh/api/Cluster.java    | 10 ++-
 6 files changed, 62 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/31f3883c/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 7d4b129..efcf4a0 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -49,7 +49,7 @@ import java.util.Set;
 public class CreateLaunchExperiment {
 
     //FIXME: Read from a config file
-    public static final String THRIFT_SERVER_HOST = "localhost";
+    public static final String THRIFT_SERVER_HOST = "gw111.iu.xsede.org";
     public static final int THRIFT_SERVER_PORT = 8930;
     private final static Logger logger = LoggerFactory.getLogger(CreateLaunchExperiment.class);
     private static final String DEFAULT_USER = "defauly.registry.user";

http://git-wip-us.apache.org/repos/asf/airavata/blob/31f3883c/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperimentUS3.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperimentUS3.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperimentUS3.java
index eb76412..24fd58d 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperimentUS3.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperimentUS3.java
@@ -37,7 +37,7 @@ public class CreateLaunchExperimentUS3 {
     public static final String THRIFT_SERVER_HOST = "gw111.iu.xsede.org";
     public static final int THRIFT_SERVER_PORT = 8930;
     private final static Logger logger = LoggerFactory.getLogger(CreateLaunchExperiment.class);
-    private static final String DEFAULT_USER = "defauly.registry.user";
+    private static final String DEFAULT_USER = "default.registry.user";
     private static final String DEFAULT_GATEWAY = "default.registry.gateway";
     public static void main(String[] args) {
         try {
@@ -70,7 +70,7 @@ public class CreateLaunchExperimentUS3 {
                                      return;
                                  }else {
                                      if (JobState.COMPLETE.equals(jobStatus.getJobState())) {
-                                         System.out.println("Job completed Job ID: " + key);
+                                         System.out.println("Job completed Job ID: " + jobStatus.getJobState().toString());
                                          return;
                                      }else{
                                         System.out.println("Job ID:" + key + jobStatuses.get(key).getJobState().toString());

http://git-wip-us.apache.org/repos/asf/airavata/blob/31f3883c/airavata-api/airavata-client-sdks/java-client-samples/src/main/resources/airavata-client.properties
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/resources/airavata-client.properties b/airavata-api/airavata-client-sdks/java-client-samples/src/main/resources/airavata-client.properties
index 2a463b1..83be989 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/resources/airavata-client.properties
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/resources/airavata-client.properties
@@ -34,14 +34,14 @@ class.registry.accessor=org.apache.airavata.persistance.registry.jpa.impl.Airava
 
 ########################Registry JPA Implementation Settings########################
 #for mysql [AiravataJPARegistry]
-#registry.jdbc.driver=com.mysql.jdbc.Driver
-#registry.jdbc.url=jdbc:mysql://localhost:3306/persistent_data
+registry.jdbc.driver=com.mysql.jdbc.Driver
+registry.jdbc.url=jdbc:mysql://gw111.iu.xsede.org:3306/airavata
 
 #for derby [AiravataJPARegistry]
-registry.jdbc.driver=org.apache.derby.jdbc.ClientDriver
-registry.jdbc.url=jdbc:derby://localhost:1527/persistent_data;create=true;user=airavata;password=airavata
+#registry.jdbc.driver=org.apache.derby.jdbc.ClientDriver
+#registry.jdbc.url=jdbc:derby://localhost:1527/persistent_data;create=true;user=airavata;password=airavata
 registry.jdbc.user=airavata
-registry.jdbc.password=airavata
+registry.jdbc.password=airavata12
 start.derby.server.mode=true
 
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/31f3883c/modules/gfac/gfac-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/pom.xml b/modules/gfac/gfac-core/pom.xml
index 5e0965d..070f16c 100644
--- a/modules/gfac/gfac-core/pom.xml
+++ b/modules/gfac/gfac-core/pom.xml
@@ -95,13 +95,7 @@
             <version>${project.version}</version>
         </dependency>
 
-        <!-- SSH -->
-        <dependency>
-            <groupId>net.schmizz</groupId>
-            <artifactId>sshj</artifactId>
-            <version>0.8.0</version>
-        </dependency>
-
+   
         <!-- Credential Store -->
         <dependency>
             <groupId>org.apache.airavata</groupId>

http://git-wip-us.apache.org/repos/asf/airavata/blob/31f3883c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
index 1b44c1b..f857d79 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
@@ -31,13 +31,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import net.schmizz.sshj.connection.ConnectionException;
-import net.schmizz.sshj.connection.channel.direct.Session;
-import net.schmizz.sshj.connection.channel.direct.Session.Command;
-import net.schmizz.sshj.transport.TransportException;
-import net.schmizz.sshj.xfer.scp.SCPFileTransfer;
 
 import org.apache.airavata.commons.gfac.type.ActualParameter;
 import org.apache.airavata.commons.gfac.type.MappingFactory;
@@ -49,6 +42,11 @@ import org.apache.airavata.gfac.context.security.SSHSecurityContext;
 import org.apache.airavata.gfac.provider.GFacProvider;
 import org.apache.airavata.gfac.provider.GFacProviderException;
 import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.CommandExecutor;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.impl.RawCommandInfo;
+import org.apache.airavata.gsi.ssh.impl.StandardOutReader;
 import org.apache.airavata.model.workspace.experiment.JobState;
 import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
 import org.apache.airavata.schemas.gfac.NameValuePairType;
@@ -64,7 +62,7 @@ import sun.reflect.generics.reflectiveObjects.NotImplementedException;
  */
 public class SSHProvider extends AbstractProvider implements GFacProvider{
     private static final Logger log = LoggerFactory.getLogger(SSHProvider.class);
-    private SSHSecurityContext securityContext;
+    private Cluster cluster;
     private String jobID = null;
     private String taskID = null;
     // we keep gsisshprovider to support qsub submission incase of hpc scenario with ssh
@@ -75,18 +73,17 @@ public class SSHProvider extends AbstractProvider implements GFacProvider{
     	taskID = jobExecutionContext.getTaskData().getTaskID();
 		if (!((SSHHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getHpcResource()) {
             jobID = "SSH_" + jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress() + "_" + Calendar.getInstance().getTimeInMillis();
-
-            securityContext = (SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT);
+            cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
+            
             ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
             String remoteFile = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
             details.setJobDescription(remoteFile);
             GFacUtils.saveJobStatus(details, JobState.SETUP, taskID);
             log.info(remoteFile);
             try {
-                File runscript = createShellScript(jobExecutionContext);
-                SCPFileTransfer fileTransfer = securityContext.getSSHClient().newSCPFileTransfer();
-                fileTransfer.upload(runscript.getAbsolutePath(), remoteFile);
-            } catch (IOException e) {
+            	File runscript = createShellScript(jobExecutionContext);
+                cluster.scpTo(remoteFile, runscript.getAbsolutePath());
+            } catch (Exception e) {
                 throw new GFacProviderException(e.getLocalizedMessage(), e);
             }
         }else{
@@ -98,41 +95,32 @@ public class SSHProvider extends AbstractProvider implements GFacProvider{
     public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
         if (gsiSshProvider == null) {
             ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
-            Session session = null;
             try {
-                session = securityContext.getSession(jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress());
                 /*
                  * Execute
                  */
                 String execuable = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
                 details.setJobDescription(execuable);
                 GFacUtils.updateJobStatus(details, JobState.SUBMITTED);
-                Command cmd = session.exec("/bin/chmod 755 " + execuable + "; " + execuable);
-                log.info("stdout=" + GFacUtils.readFromStream(session.getInputStream()));
-                cmd.join(Constants.COMMAND_EXECUTION_TIMEOUT, TimeUnit.SECONDS);
+                RawCommandInfo rawCommandInfo = new RawCommandInfo("/bin/chmod 755 " + execuable + "; " + execuable);
 
-                /*
-                 * check return value. usually not very helpful to draw conclusions
-                 * based on return values so don't bother. just provide warning in
-                 * the log messages
-                 */
-                if (cmd.getExitStatus() != 0) {
-                    log.error("Process finished with non zero return value. Process may have failed");
-                } else {
-                    log.info("Process finished with return value of zero.");
-                }
+                StandardOutReader jobIDReaderCommandOutput = new StandardOutReader();
+                
+                CommandExecutor.executeCommand(rawCommandInfo, cluster.getSession(), jobIDReaderCommandOutput);
+                String stdOutputString = getOutputifAvailable(jobIDReaderCommandOutput, "Error submitting job to resource");
+
+                log.info("stdout=" + stdOutputString);
+             
                 GFacUtils.updateJobStatus(details, JobState.COMPLETE);
-            } catch (ConnectionException e) {
-                throw new GFacProviderException(e.getMessage(), e);
-            } catch (TransportException e) {
-                throw new GFacProviderException(e.getMessage(), e);
-            } catch (IOException e) {
-                throw new GFacProviderException(e.getMessage(), e);
-            }catch (Exception e) {
+            } catch (Exception e) {
                 throw new GFacProviderException(e.getMessage(), e);
             } finally {
-                if (securityContext != null) {
-                    securityContext.closeSession(session);
+                if (cluster != null) {
+                	try {
+						cluster.disconnect();
+					} catch (SSHApiException e) {
+						throw new GFacProviderException(e.getMessage(), e);
+					}
                 }
             }
         } else {
@@ -245,5 +233,23 @@ public class SSHProvider extends AbstractProvider implements GFacProvider{
             }
         }
     }
+    /**
+     * This method will read standard output and if there's any it will be parsed
+     * @param jobIDReaderCommandOutput
+     * @param errorMsg
+     * @return
+     * @throws SSHApiException
+     */
+    private String getOutputifAvailable(StandardOutReader jobIDReaderCommandOutput, String errorMsg) throws SSHApiException {
+        String stdOutputString = jobIDReaderCommandOutput.getStdOutputString();
+        String stdErrorString = jobIDReaderCommandOutput.getStdErrorString();
+
+        if(stdOutputString == null || stdOutputString.isEmpty() || (stdErrorString != null && !stdErrorString.isEmpty())){
+            log.error("Standard Error output : " + stdErrorString);
+            throw new SSHApiException(errorMsg + stdErrorString);
+        }
+        return stdOutputString;
+    }
+
 
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/31f3883c/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
index f8cd543..1f49bc7 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
@@ -25,6 +25,8 @@ import java.util.List;
 import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
 import org.apache.airavata.gsi.ssh.impl.JobStatus;
 
+import com.jcraft.jsch.Session;
+
 /**
  * This interface represents a Cluster machine
  * End users of the API can implement this and come up with their own
@@ -110,7 +112,13 @@ public interface Cluster {
      */
     public List<String> listDirectory(String directoryPath) throws SSHApiException;
 
-
+    /**
+     * This method can be used to get created ssh session
+     * to reuse the created session.
+     * @throws SSHApiException
+     */
+    public Session getSession() throws SSHApiException;
+    
     /**
      * This method can be used to close the connections initialized
      * to handle graceful shutdown of the system