You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by he...@apache.org on 2013/03/09 03:24:55 UTC
svn commit: r1454642 -
/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java
Author: heshan
Date: Sat Mar 9 02:24:55 2013
New Revision: 1454642
URL: http://svn.apache.org/r1454642
Log:
AIRAVATA-202 Executing the job on EC2 and adding the result to jobContext.
Modified:
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java?rev=1454642&r1=1454641&r2=1454642&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java Sat Mar 9 02:24:55 2013
@@ -39,11 +39,14 @@ import com.sshtools.j2ssh.transport.publ
import com.sshtools.j2ssh.transport.publickey.SshPrivateKeyFile;
import com.sshtools.j2ssh.transport.publickey.SshPublicKey;
import com.sshtools.j2ssh.util.Base64;
+import org.apache.airavata.commons.gfac.type.ActualParameter;
import org.apache.airavata.gfac.context.AmazonSecurityContext;
import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gfac.notification.events.EC2ProviderEvent;
import org.apache.airavata.gfac.provider.GFacProvider;
import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.schemas.gfac.OutputParameterType;
+import org.apache.airavata.schemas.gfac.StringParameterType;
import org.bouncycastle.openssl.PEMWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -127,6 +130,16 @@ public class EC2Provider implements GFac
// Connect to the host
try
{
+ String outParamName;
+ OutputParameterType[] outputParametersArray = jobExecutionContext.getApplicationContext().
+ getServiceDescription().getType().getOutputParametersArray();
+ if(outputParametersArray != null) {
+ outParamName = outputParametersArray[0].getParameterName();
+ } else {
+ throw new GFacProviderException("Output parameter name is not set. Therefore, not being able " +
+ "to filter the job result from standard out ", jobExecutionContext);
+ }
+
sshClient.connect(properties, new HostKeyVerification() {
public boolean verifyHost(String s, SshPublicKey sshPublicKey) throws TransportProtocolException {
log.debug("Verifying Host: " + s);
@@ -152,19 +165,21 @@ public class EC2Provider implements GFac
log.info("ssh client authentication is complete...");
}
+ // Execute job
SessionChannelClient session = sshClient.openSessionChannel();
log.info("ssh session is open successfully...");
session.requestPseudoTerminal("vt100", 80, 25, 0, 0, "");
session.startShell();
session.getOutputStream().write(command2.getBytes());
- InputStream in = session.getInputStream();
- byte buffer[] = new byte[255];
- int read;
- while((read = in.read(buffer)) > 0) {
- String out = new String(buffer, 0, read);
- System.out.println(out);
- }
+ String executionResult = getResultFromStdOut(outParamName, session);
+ log.info("Result of the job : " + executionResult);
+
+ // Set result
+ ActualParameter outParam = new ActualParameter();
+ outParam.getType().changeType(StringParameterType.type);
+ ((StringParameterType) outParam.getType()).setValue(executionResult);
+ jobExecutionContext.getOutMessageContext().addParameter(outParamName, outParam);
} catch (InvalidSshKeyException e) {
throw new GFacProviderException("Invalid SSH key", e);
@@ -176,6 +191,21 @@ public class EC2Provider implements GFac
}
+ private String getResultFromStdOut(String outParamName, SessionChannelClient session) throws IOException {
+ InputStream in = session.getInputStream();
+ byte buffer[] = new byte[255];
+ int read;
+ String executionResult = "";
+ while((read = in.read(buffer)) > 0) {
+ String out = new String(buffer, 0, read);
+ if(out.startsWith(outParamName)) {
+ executionResult = out.split("=")[1].replace("\r","").replace("\n","");
+ break;
+ }
+ }
+ return executionResult;
+ }
+
public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
}