You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by he...@apache.org on 2013/03/09 03:24:55 UTC

svn commit: r1454642 - /airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java

Author: heshan
Date: Sat Mar  9 02:24:55 2013
New Revision: 1454642

URL: http://svn.apache.org/r1454642
Log:
AIRAVATA-202 Executing the job on EC2 and adding the result to jobContext.

Modified:
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java

Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java?rev=1454642&r1=1454641&r2=1454642&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java Sat Mar  9 02:24:55 2013
@@ -39,11 +39,14 @@ import com.sshtools.j2ssh.transport.publ
 import com.sshtools.j2ssh.transport.publickey.SshPrivateKeyFile;
 import com.sshtools.j2ssh.transport.publickey.SshPublicKey;
 import com.sshtools.j2ssh.util.Base64;
+import org.apache.airavata.commons.gfac.type.ActualParameter;
 import org.apache.airavata.gfac.context.AmazonSecurityContext;
 import org.apache.airavata.gfac.context.JobExecutionContext;
 import org.apache.airavata.gfac.notification.events.EC2ProviderEvent;
 import org.apache.airavata.gfac.provider.GFacProvider;
 import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.schemas.gfac.OutputParameterType;
+import org.apache.airavata.schemas.gfac.StringParameterType;
 import org.bouncycastle.openssl.PEMWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -127,6 +130,16 @@ public class EC2Provider implements GFac
         // Connect to the host
         try
         {
+            String outParamName;
+            OutputParameterType[] outputParametersArray = jobExecutionContext.getApplicationContext().
+                    getServiceDescription().getType().getOutputParametersArray();
+            if(outputParametersArray != null) {
+                outParamName = outputParametersArray[0].getParameterName();
+            } else {
+                throw new GFacProviderException("Output parameter name is not set. Therefore, not being able " +
+                        "to filter the job result from standard out ", jobExecutionContext);
+            }
+
             sshClient.connect(properties, new HostKeyVerification() {
                 public boolean verifyHost(String s, SshPublicKey sshPublicKey) throws TransportProtocolException {
                     log.debug("Verifying Host: " + s);
@@ -152,19 +165,21 @@ public class EC2Provider implements GFac
                 log.info("ssh client authentication is complete...");
             }
 
+            // Execute job
             SessionChannelClient session = sshClient.openSessionChannel();
             log.info("ssh session is open successfully...");
             session.requestPseudoTerminal("vt100", 80, 25, 0, 0, "");
             session.startShell();
             session.getOutputStream().write(command2.getBytes());
 
-            InputStream in = session.getInputStream();
-            byte buffer[] = new byte[255];
-            int read;
-            while((read = in.read(buffer)) > 0) {
-                String out = new String(buffer, 0, read);
-                System.out.println(out);
-            }
+            String executionResult = getResultFromStdOut(outParamName, session);
+            log.info("Result of the job : " + executionResult);
+
+            // Set result
+            ActualParameter outParam = new ActualParameter();
+            outParam.getType().changeType(StringParameterType.type);
+            ((StringParameterType) outParam.getType()).setValue(executionResult);
+            jobExecutionContext.getOutMessageContext().addParameter(outParamName, outParam);
 
         } catch (InvalidSshKeyException e) {
             throw new GFacProviderException("Invalid SSH key", e);
@@ -176,6 +191,21 @@ public class EC2Provider implements GFac
 
     }
 
+    private String getResultFromStdOut(String outParamName, SessionChannelClient session) throws IOException {
+        InputStream in = session.getInputStream();
+        byte buffer[] = new byte[255];
+        int read;
+        String executionResult = "";
+        while((read = in.read(buffer)) > 0) {
+            String out = new String(buffer, 0, read);
+            if(out.startsWith(outParamName)) {
+                executionResult = out.split("=")[1].replace("\r","").replace("\n","");
+                break;
+            }
+        }
+        return executionResult;
+    }
+
     public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
 
     }