You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sa...@apache.org on 2013/06/11 15:54:05 UTC
svn commit: r1491791 - in /airavata/trunk/modules:
gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/
registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/
registry/registry-api/src/main/java/org/a...
Author: samindaw
Date: Tue Jun 11 13:54:05 2013
New Revision: 1491791
URL: http://svn.apache.org/r1491791
Log:
application job data persistance for EC2Provider
Modified:
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
airavata/trunk/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/AiravataJPARegistry.java
airavata/trunk/modules/registry/registry-api/src/main/java/org/apache/airavata/registry/api/workflow/ApplicationJob.java
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java?rev=1491791&r1=1491790&r2=1491791&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java Tue Jun 11 13:54:05 2013
@@ -202,7 +202,7 @@ public class BESProvider implements GFac
appJob.setJobId(jobId);
appJob.setJobData(jobDefinition.toString());
appJob.setSubmittedTime(Calendar.getInstance().getTime());
- appJob.setJobStatus(ApplicationJobStatus.SUBMITTED);
+ appJob.setStatus(ApplicationJobStatus.SUBMITTED);
appJob.setStatusUpdateTime(appJob.getSubmittedTime());
GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
}
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java?rev=1491791&r1=1491790&r2=1491791&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java Tue Jun 11 13:54:05 2013
@@ -37,6 +37,9 @@ import com.sshtools.j2ssh.transport.publ
import com.sshtools.j2ssh.transport.publickey.SshPrivateKey;
import com.sshtools.j2ssh.transport.publickey.SshPrivateKeyFile;
import com.sshtools.j2ssh.transport.publickey.SshPublicKey;
+
+import org.apache.airavata.client.api.AiravataAPI;
+import org.apache.airavata.client.api.exception.AiravataAPIInvocationException;
import org.apache.airavata.commons.gfac.type.ActualParameter;
import org.apache.airavata.commons.gfac.type.ApplicationDescription;
import org.apache.airavata.gfac.GFacException;
@@ -48,6 +51,9 @@ import org.apache.airavata.gfac.provider
import org.apache.airavata.gfac.provider.utils.AmazonEC2Util;
import org.apache.airavata.gfac.provider.utils.EC2ProviderUtil;
import org.apache.airavata.gfac.provider.utils.ProviderUtils;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.registry.api.workflow.ApplicationJob;
+import org.apache.airavata.registry.api.workflow.ApplicationJob.ApplicationJobStatus;
import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
import org.apache.airavata.schemas.gfac.Ec2ApplicationDeploymentType;
import org.apache.airavata.schemas.gfac.OutputParameterType;
@@ -58,6 +64,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Calendar;
import java.util.List;
import java.util.Map;
@@ -74,9 +81,12 @@ public class EC2Provider implements GFac
private Instance instance = null;
private AmazonSecurityContext amazonSecurityContext;
+
+ private String jobId;
public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException{
if (jobExecutionContext != null) {
+ jobId="EC2_"+jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress()+"_"+Calendar.getInstance().getTimeInMillis();
if (jobExecutionContext.getSecurityContext(AmazonSecurityContext.AMAZON_SECURITY_CONTEXT)
instanceof AmazonSecurityContext) {
this.amazonSecurityContext = (AmazonSecurityContext) jobExecutionContext.
@@ -96,7 +106,8 @@ public class EC2Provider implements GFac
log.debug("INS_TYPE:" + amazonSecurityContext.getInstanceType());
log.debug("USERNAME:" + amazonSecurityContext.getUserName());
}
-
+ saveApplicationJob(jobExecutionContext);
+// job
/* Validation */
if (amazonSecurityContext.getAccessKey() == null || amazonSecurityContext.getAccessKey().isEmpty())
throw new GFacProviderException("EC2 Access Key is empty", jobExecutionContext);
@@ -113,13 +124,31 @@ public class EC2Provider implements GFac
AWSCredentials credential =
new BasicAWSCredentials(amazonSecurityContext.getAccessKey(), amazonSecurityContext.getSecretKey());
AmazonEC2Client ec2client = new AmazonEC2Client(credential);
-
+ GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, ApplicationJobStatus.AUTHENTICATE);
initEc2Environment(jobExecutionContext, ec2client);
checkConnection(instance, ec2client);
}
+ private void saveApplicationJob(JobExecutionContext jobExecutionContext) {
+ ApplicationJob job = GFacUtils.createApplicationJob(jobExecutionContext);
+ job.setJobId(jobId);
+ job.setStatus(ApplicationJobStatus.VALIDATE_INPUT);
+ job.setSubmittedTime(Calendar.getInstance().getTime());
+ job.setStatusUpdateTime(job.getSubmittedTime());
+ GFacUtils.recordApplicationJob(jobExecutionContext, job);
+ }
+
public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, ApplicationJobStatus.INITIALIZE);
String shellCmd = createShellCmd(jobExecutionContext);
+ AiravataAPI airavataAPI = jobExecutionContext.getGFacConfiguration().getAiravataAPI();
+ if (airavataAPI!=null){
+ try {
+ airavataAPI.getProvenanceManager().updateApplicationJobData(jobId, shellCmd);
+ } catch (AiravataAPIInvocationException e) {
+ log.error("Error in saving EC2 shell command!!!", e);
+ }
+ }
SshClient sshClient = new SshClient();
sshClient.setSocketTimeout(SOCKET_TIMEOUT);
SshConnectionProperties properties = new SshConnectionProperties();
@@ -145,7 +174,7 @@ public class EC2Provider implements GFac
return true;
}
});
-
+ GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, ApplicationJobStatus.AUTHENTICATE);
// Initialize the authentication data.
PublicKeyAuthenticationClient publicKeyAuth = new PublicKeyAuthenticationClient();
publicKeyAuth.setUsername(amazonSecurityContext.getUserName());
@@ -164,11 +193,13 @@ public class EC2Provider implements GFac
} else if(result==AuthenticationProtocolState.COMPLETE) {
log.info("ssh client authentication is complete...");
}
-
+ GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, ApplicationJobStatus.SUBMITTED);
SessionChannelClient session = sshClient.openSessionChannel();
log.info("ssh session successfully opened...");
session.requestPseudoTerminal("vt100", 80, 25, 0, 0, "");
session.startShell();
+
+ GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, ApplicationJobStatus.EXECUTING);
session.getOutputStream().write(shellCmd.getBytes());
InputStream in = session.getInputStream();
@@ -185,6 +216,7 @@ public class EC2Provider implements GFac
break;
}
}
+ GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, ApplicationJobStatus.RESULTS_RETRIEVE);
executionResult = executionResult.replace("\r","").replace("\n","");
log.info("Result of the job : " + executionResult);
@@ -198,7 +230,7 @@ public class EC2Provider implements GFac
((StringParameterType) outParam.getType()).setValue(executionResult);
jobExecutionContext.getOutMessageContext().addParameter(paramName, outParam);
}
-
+ GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, ApplicationJobStatus.FINISHED);
} catch (InvalidSshKeyException e) {
throw new GFacProviderException("Invalid SSH key", e);
} catch (IOException e) {
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java?rev=1491791&r1=1491790&r2=1491791&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java Tue Jun 11 13:54:05 2013
@@ -138,7 +138,7 @@ public class GramProvider implements GFa
appJob.setJobId(job.getIDAsString());
appJob.setJobData(job.getRSL());
appJob.setSubmittedTime(Calendar.getInstance().getTime());
- appJob.setJobStatus(ApplicationJobStatus.SUBMITTED);
+ appJob.setStatus(ApplicationJobStatus.SUBMITTED);
appJob.setStatusUpdateTime(appJob.getSubmittedTime());
GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
}
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java?rev=1491791&r1=1491790&r2=1491791&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java Tue Jun 11 13:54:05 2013
@@ -21,6 +21,12 @@
package org.apache.airavata.gfac.provider.impl;
+import java.io.File;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Map;
+
import org.apache.airavata.commons.gfac.type.ActualParameter;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.context.JobExecutionContext;
@@ -34,22 +40,16 @@ import org.apache.airavata.schemas.gfac.
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
/**
* Executes hadoop job using the cluster configuration provided by handlers in
* in-flow.
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java?rev=1491791&r1=1491790&r2=1491791&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java Tue Jun 11 13:54:05 2013
@@ -191,7 +191,7 @@ public class LocalProvider implements GF
JAXB.marshal(data, stream);
appJob.setJobData(stream.toString());
appJob.setSubmittedTime(Calendar.getInstance().getTime());
- appJob.setJobStatus(ApplicationJobStatus.SUBMITTED);
+ appJob.setStatus(ApplicationJobStatus.SUBMITTED);
appJob.setStatusUpdateTime(appJob.getSubmittedTime());
GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
}
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java?rev=1491791&r1=1491790&r2=1491791&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java Tue Jun 11 13:54:05 2013
@@ -86,7 +86,7 @@ public class SSHProvider implements GFac
private void saveApplicationJob(JobExecutionContext jobExecutionContext, String executableName) {
ApplicationJob job = GFacUtils.createApplicationJob(jobExecutionContext);
job.setJobId(jobID);
- job.setJobStatus(ApplicationJobStatus.INITIALIZE);
+ job.setStatus(ApplicationJobStatus.INITIALIZE);
job.setSubmittedTime(Calendar.getInstance().getTime());
job.setStatusUpdateTime(job.getSubmittedTime());
job.setJobData(executableName);
Modified: airavata/trunk/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/AiravataJPARegistry.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/AiravataJPARegistry.java?rev=1491791&r1=1491790&r2=1491791&view=diff
==============================================================================
--- airavata/trunk/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/AiravataJPARegistry.java (original)
+++ airavata/trunk/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/AiravataJPARegistry.java Tue Jun 11 13:54:05 2013
@@ -2273,7 +2273,7 @@ public class AiravataJPARegistry extends
gfacJob.setJobData(job.getJobData());
gfacJob.setMetadata(job.getMetadata());
gfacJob.setServiceDescID(job.getServiceDescriptionId());
- gfacJob.setStatus(job.getJobStatus().toString());
+ gfacJob.setStatus(job.getStatus().toString());
gfacJob.setSubmittedTime(new Timestamp(job.getSubmittedTime().getTime()));
}
@@ -2353,7 +2353,7 @@ public class AiravataJPARegistry extends
job.setHostDescriptionId(gfacJob.getHostDescID());
job.setJobData(gfacJob.getJobData());
job.setJobId(gfacJob.getLocalJobID());
- job.setJobStatus(ApplicationJobStatus.valueOf(gfacJob.getStatus()));
+ job.setStatus(ApplicationJobStatus.valueOf(gfacJob.getStatus()));
job.setMetadata(gfacJob.getMetadata());
job.setNodeId(gfacJob.getNodeID());
job.setServiceDescriptionId(gfacJob.getServiceDescID());
Modified: airavata/trunk/modules/registry/registry-api/src/main/java/org/apache/airavata/registry/api/workflow/ApplicationJob.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/registry/registry-api/src/main/java/org/apache/airavata/registry/api/workflow/ApplicationJob.java?rev=1491791&r1=1491790&r2=1491791&view=diff
==============================================================================
--- airavata/trunk/modules/registry/registry-api/src/main/java/org/apache/airavata/registry/api/workflow/ApplicationJob.java (original)
+++ airavata/trunk/modules/registry/registry-api/src/main/java/org/apache/airavata/registry/api/workflow/ApplicationJob.java Tue Jun 11 13:54:05 2013
@@ -37,6 +37,10 @@ public class ApplicationJob {
*/
public static enum ApplicationJobStatus{
/**
+ * Validating the application job input data and configurations
+ */
+ VALIDATE_INPUT,
+ /**
* Input data/files is being staged for the application job.
*/
STAGING,
@@ -45,14 +49,14 @@ public class ApplicationJob {
*/
AUTHENTICATE,
/**
- * Application job is submitted, possibly waiting to start executing.
- */
- SUBMITTED,
- /**
* Application job is being initialized.
*/
INITIALIZE,
/**
+ * Application job is submitted, possibly waiting to start executing.
+ */
+ SUBMITTED,
+ /**
* Application job is waiting to start/continue its executing.
*/
PENDING,
@@ -67,7 +71,7 @@ public class ApplicationJob {
/**
* Application job is waiting for data or a trigger to continue its execution.
*/
- WAITING_FOR_DATA,
+ WAIT_FOR_DATA,
/**
* Finalizing the execution of the application job.
*/
@@ -81,6 +85,10 @@ public class ApplicationJob {
*/
RESULTS_RETRIEVE,
/**
+ * Validating the application job execution results
+ */
+ VALIDATE_OUTPUT,
+ /**
* Application job completed successfully.
*/
FINISHED,
@@ -113,7 +121,7 @@ public class ApplicationJob {
private Date submittedTime;
private Date statusUpdateTime;
- private ApplicationJobStatus jobStatus;
+ private ApplicationJobStatus status;
private String metadata;
@@ -251,12 +259,12 @@ public class ApplicationJob {
* Get the currently recorded status of the application job.
* @return
*/
- public ApplicationJobStatus getJobStatus() {
- return jobStatus;
+ public ApplicationJobStatus getStatus() {
+ return status;
}
- public void setJobStatus(ApplicationJobStatus jobStatus) {
- this.jobStatus = jobStatus;
+ public void setStatus(ApplicationJobStatus status) {
+ this.status = status;
}
/**