You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sa...@apache.org on 2013/06/11 15:54:05 UTC

svn commit: r1491791 - in /airavata/trunk/modules: gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/ registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/ registry/registry-api/src/main/java/org/a...

Author: samindaw
Date: Tue Jun 11 13:54:05 2013
New Revision: 1491791

URL: http://svn.apache.org/r1491791
Log:
application job data persistance for EC2Provider

Modified:
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
    airavata/trunk/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/AiravataJPARegistry.java
    airavata/trunk/modules/registry/registry-api/src/main/java/org/apache/airavata/registry/api/workflow/ApplicationJob.java

Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java?rev=1491791&r1=1491790&r2=1491791&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java Tue Jun 11 13:54:05 2013
@@ -202,7 +202,7 @@ public class BESProvider implements GFac
         appJob.setJobId(jobId);
         appJob.setJobData(jobDefinition.toString());
         appJob.setSubmittedTime(Calendar.getInstance().getTime());
-        appJob.setJobStatus(ApplicationJobStatus.SUBMITTED);
+        appJob.setStatus(ApplicationJobStatus.SUBMITTED);
         appJob.setStatusUpdateTime(appJob.getSubmittedTime());
         GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
 	}

Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java?rev=1491791&r1=1491790&r2=1491791&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java Tue Jun 11 13:54:05 2013
@@ -37,6 +37,9 @@ import com.sshtools.j2ssh.transport.publ
 import com.sshtools.j2ssh.transport.publickey.SshPrivateKey;
 import com.sshtools.j2ssh.transport.publickey.SshPrivateKeyFile;
 import com.sshtools.j2ssh.transport.publickey.SshPublicKey;
+
+import org.apache.airavata.client.api.AiravataAPI;
+import org.apache.airavata.client.api.exception.AiravataAPIInvocationException;
 import org.apache.airavata.commons.gfac.type.ActualParameter;
 import org.apache.airavata.commons.gfac.type.ApplicationDescription;
 import org.apache.airavata.gfac.GFacException;
@@ -48,6 +51,9 @@ import org.apache.airavata.gfac.provider
 import org.apache.airavata.gfac.provider.utils.AmazonEC2Util;
 import org.apache.airavata.gfac.provider.utils.EC2ProviderUtil;
 import org.apache.airavata.gfac.provider.utils.ProviderUtils;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.registry.api.workflow.ApplicationJob;
+import org.apache.airavata.registry.api.workflow.ApplicationJob.ApplicationJobStatus;
 import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
 import org.apache.airavata.schemas.gfac.Ec2ApplicationDeploymentType;
 import org.apache.airavata.schemas.gfac.OutputParameterType;
@@ -58,6 +64,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Calendar;
 import java.util.List;
 import java.util.Map;
 
@@ -74,9 +81,12 @@ public class EC2Provider implements GFac
     private Instance instance = null;
 
     private AmazonSecurityContext amazonSecurityContext;
+    
+    private String jobId;
 
     public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException{
         if (jobExecutionContext != null) {
+    		jobId="EC2_"+jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress()+"_"+Calendar.getInstance().getTimeInMillis();
             if (jobExecutionContext.getSecurityContext(AmazonSecurityContext.AMAZON_SECURITY_CONTEXT)
                     instanceof AmazonSecurityContext) {
                 this.amazonSecurityContext = (AmazonSecurityContext) jobExecutionContext.
@@ -96,7 +106,8 @@ public class EC2Provider implements GFac
             log.debug("INS_TYPE:" + amazonSecurityContext.getInstanceType());
             log.debug("USERNAME:" + amazonSecurityContext.getUserName());
         }
-
+        saveApplicationJob(jobExecutionContext);
+//        job
         /* Validation */
         if (amazonSecurityContext.getAccessKey() == null || amazonSecurityContext.getAccessKey().isEmpty())
             throw new GFacProviderException("EC2 Access Key is empty", jobExecutionContext);
@@ -113,13 +124,31 @@ public class EC2Provider implements GFac
         AWSCredentials credential =
                 new BasicAWSCredentials(amazonSecurityContext.getAccessKey(), amazonSecurityContext.getSecretKey());
         AmazonEC2Client ec2client = new AmazonEC2Client(credential);
-
+        GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, ApplicationJobStatus.AUTHENTICATE);
         initEc2Environment(jobExecutionContext, ec2client);
         checkConnection(instance, ec2client);
     }
 
+	private void saveApplicationJob(JobExecutionContext jobExecutionContext) {
+		ApplicationJob job = GFacUtils.createApplicationJob(jobExecutionContext);
+        job.setJobId(jobId);
+        job.setStatus(ApplicationJobStatus.VALIDATE_INPUT);
+        job.setSubmittedTime(Calendar.getInstance().getTime());
+        job.setStatusUpdateTime(job.getSubmittedTime());
+        GFacUtils.recordApplicationJob(jobExecutionContext, job);
+	}
+
     public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+    	GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, ApplicationJobStatus.INITIALIZE);
         String shellCmd = createShellCmd(jobExecutionContext);
+        AiravataAPI airavataAPI = jobExecutionContext.getGFacConfiguration().getAiravataAPI();
+        if (airavataAPI!=null){
+        	try {
+				airavataAPI.getProvenanceManager().updateApplicationJobData(jobId, shellCmd);
+			} catch (AiravataAPIInvocationException e) {
+				log.error("Error in saving EC2 shell command!!!", e);
+			}
+        }
         SshClient sshClient = new SshClient();
         sshClient.setSocketTimeout(SOCKET_TIMEOUT);
         SshConnectionProperties properties = new SshConnectionProperties();
@@ -145,7 +174,7 @@ public class EC2Provider implements GFac
                     return true;
                 }
             });
-
+            GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, ApplicationJobStatus.AUTHENTICATE);
             // Initialize the authentication data.
             PublicKeyAuthenticationClient publicKeyAuth = new PublicKeyAuthenticationClient();
             publicKeyAuth.setUsername(amazonSecurityContext.getUserName());
@@ -164,11 +193,13 @@ public class EC2Provider implements GFac
             } else if(result==AuthenticationProtocolState.COMPLETE) {
                 log.info("ssh client authentication is complete...");
             }
-
+            GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, ApplicationJobStatus.SUBMITTED);
             SessionChannelClient session = sshClient.openSessionChannel();
             log.info("ssh session successfully opened...");
             session.requestPseudoTerminal("vt100", 80, 25, 0, 0, "");
             session.startShell();
+            
+            GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, ApplicationJobStatus.EXECUTING);
             session.getOutputStream().write(shellCmd.getBytes());
 
             InputStream in = session.getInputStream();
@@ -185,6 +216,7 @@ public class EC2Provider implements GFac
                     break;
                 }
             }
+            GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, ApplicationJobStatus.RESULTS_RETRIEVE);
 
             executionResult = executionResult.replace("\r","").replace("\n","");
             log.info("Result of the job : " + executionResult);
@@ -198,7 +230,7 @@ public class EC2Provider implements GFac
                 ((StringParameterType) outParam.getType()).setValue(executionResult);
                 jobExecutionContext.getOutMessageContext().addParameter(paramName, outParam);
             }
-
+            GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, ApplicationJobStatus.FINISHED);
         } catch (InvalidSshKeyException e) {
             throw new GFacProviderException("Invalid SSH key", e);
         } catch (IOException e) {

Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java?rev=1491791&r1=1491790&r2=1491791&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java Tue Jun 11 13:54:05 2013
@@ -138,7 +138,7 @@ public class GramProvider implements GFa
 		appJob.setJobId(job.getIDAsString());
 		appJob.setJobData(job.getRSL());
 		appJob.setSubmittedTime(Calendar.getInstance().getTime());
-		appJob.setJobStatus(ApplicationJobStatus.SUBMITTED);
+		appJob.setStatus(ApplicationJobStatus.SUBMITTED);
 		appJob.setStatusUpdateTime(appJob.getSubmittedTime());
 		GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
 	}

Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java?rev=1491791&r1=1491790&r2=1491791&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java Tue Jun 11 13:54:05 2013
@@ -21,6 +21,12 @@
 
 package org.apache.airavata.gfac.provider.impl;
 
+import java.io.File;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Map;
+
 import org.apache.airavata.commons.gfac.type.ActualParameter;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.context.JobExecutionContext;
@@ -34,22 +40,16 @@ import org.apache.airavata.schemas.gfac.
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * Executes hadoop job using the cluster configuration provided by handlers in
  * in-flow.

Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java?rev=1491791&r1=1491790&r2=1491791&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java Tue Jun 11 13:54:05 2013
@@ -191,7 +191,7 @@ public class LocalProvider implements GF
 		JAXB.marshal(data, stream);
 		appJob.setJobData(stream.toString());
 		appJob.setSubmittedTime(Calendar.getInstance().getTime());
-		appJob.setJobStatus(ApplicationJobStatus.SUBMITTED);
+		appJob.setStatus(ApplicationJobStatus.SUBMITTED);
 		appJob.setStatusUpdateTime(appJob.getSubmittedTime());
 		GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
 	}

Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java?rev=1491791&r1=1491790&r2=1491791&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java Tue Jun 11 13:54:05 2013
@@ -86,7 +86,7 @@ public class SSHProvider implements GFac
 	private void saveApplicationJob(JobExecutionContext jobExecutionContext, String executableName) {
 		ApplicationJob job = GFacUtils.createApplicationJob(jobExecutionContext);
 		job.setJobId(jobID);
-		job.setJobStatus(ApplicationJobStatus.INITIALIZE);
+		job.setStatus(ApplicationJobStatus.INITIALIZE);
 		job.setSubmittedTime(Calendar.getInstance().getTime());
 		job.setStatusUpdateTime(job.getSubmittedTime());
 		job.setJobData(executableName);

Modified: airavata/trunk/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/AiravataJPARegistry.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/AiravataJPARegistry.java?rev=1491791&r1=1491790&r2=1491791&view=diff
==============================================================================
--- airavata/trunk/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/AiravataJPARegistry.java (original)
+++ airavata/trunk/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/AiravataJPARegistry.java Tue Jun 11 13:54:05 2013
@@ -2273,7 +2273,7 @@ public class AiravataJPARegistry extends
 		gfacJob.setJobData(job.getJobData());
 		gfacJob.setMetadata(job.getMetadata());
 		gfacJob.setServiceDescID(job.getServiceDescriptionId());
-		gfacJob.setStatus(job.getJobStatus().toString());
+		gfacJob.setStatus(job.getStatus().toString());
 		gfacJob.setSubmittedTime(new Timestamp(job.getSubmittedTime().getTime()));
 	}
 
@@ -2353,7 +2353,7 @@ public class AiravataJPARegistry extends
 		job.setHostDescriptionId(gfacJob.getHostDescID());
 		job.setJobData(gfacJob.getJobData());
 		job.setJobId(gfacJob.getLocalJobID());
-		job.setJobStatus(ApplicationJobStatus.valueOf(gfacJob.getStatus()));
+		job.setStatus(ApplicationJobStatus.valueOf(gfacJob.getStatus()));
 		job.setMetadata(gfacJob.getMetadata());
 		job.setNodeId(gfacJob.getNodeID());
 		job.setServiceDescriptionId(gfacJob.getServiceDescID());

Modified: airavata/trunk/modules/registry/registry-api/src/main/java/org/apache/airavata/registry/api/workflow/ApplicationJob.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/registry/registry-api/src/main/java/org/apache/airavata/registry/api/workflow/ApplicationJob.java?rev=1491791&r1=1491790&r2=1491791&view=diff
==============================================================================
--- airavata/trunk/modules/registry/registry-api/src/main/java/org/apache/airavata/registry/api/workflow/ApplicationJob.java (original)
+++ airavata/trunk/modules/registry/registry-api/src/main/java/org/apache/airavata/registry/api/workflow/ApplicationJob.java Tue Jun 11 13:54:05 2013
@@ -37,6 +37,10 @@ public class ApplicationJob {
 	 */
 	public static enum ApplicationJobStatus{
 		/**
+		 * Validating the application job input data and configurations
+		 */
+		VALIDATE_INPUT,
+		/**
 		 * Input data/files is being staged for the application job.
 		 */
 		STAGING,
@@ -45,14 +49,14 @@ public class ApplicationJob {
 		 */
 		AUTHENTICATE,
 		/**
-		 * Application job is submitted, possibly waiting to start executing.
-		 */
-		SUBMITTED,
-		/**
 		 * Application job is being initialized.
 		 */
 		INITIALIZE, 
 		/**
+		 * Application job is submitted, possibly waiting to start executing.
+		 */
+		SUBMITTED,
+		/**
 		 * Application job is waiting to start/continue its executing.
 		 */
 		PENDING,
@@ -67,7 +71,7 @@ public class ApplicationJob {
 		/**
 		 * Application job is waiting for data or a trigger to continue its execution.
 		 */
-		WAITING_FOR_DATA,
+		WAIT_FOR_DATA,
 		/**
 		 * Finalizing the execution of the application job.
 		 */
@@ -81,6 +85,10 @@ public class ApplicationJob {
 		 */
 		RESULTS_RETRIEVE,
 		/**
+		 * Validating the application job execution results
+		 */
+		VALIDATE_OUTPUT,
+		/**
 		 * Application job completed successfully.
 		 */
 		FINISHED,
@@ -113,7 +121,7 @@ public class ApplicationJob {
 	
 	private Date submittedTime;
 	private Date statusUpdateTime;
-	private ApplicationJobStatus jobStatus;
+	private ApplicationJobStatus status;
 	
 	private String metadata;
 
@@ -251,12 +259,12 @@ public class ApplicationJob {
 	 * Get the currently recorded status of the application job. 
 	 * @return
 	 */
-	public ApplicationJobStatus getJobStatus() {
-		return jobStatus;
+	public ApplicationJobStatus getStatus() {
+		return status;
 	}
 
-	public void setJobStatus(ApplicationJobStatus jobStatus) {
-		this.jobStatus = jobStatus;
+	public void setStatus(ApplicationJobStatus status) {
+		this.status = status;
 	}
 
 	/**