You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sa...@apache.org on 2013/06/07 18:32:56 UTC
svn commit: r1490710 - in /airavata/trunk/modules:
gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
integration-tests/src/test/java/org/apache/airavata/integration/BaseCaseIT.java
Author: samindaw
Date: Fri Jun 7 16:32:55 2013
New Revision: 1490710
URL: http://svn.apache.org/r1490710
Log:
adding application job data save for local provider + additional chks in integration tests
Modified:
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
airavata/trunk/modules/integration-tests/src/test/java/org/apache/airavata/integration/BaseCaseIT.java
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java?rev=1490710&r1=1490709&r2=1490710&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java Fri Jun 7 16:32:55 2013
@@ -20,6 +20,17 @@
*/
package org.apache.airavata.gfac.provider.impl;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+
+import javax.xml.bind.JAXB;
+
+import org.apache.airavata.client.api.exception.AiravataAPIInvocationException;
import org.apache.airavata.gfac.Constants;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.context.JobExecutionContext;
@@ -31,23 +42,57 @@ import org.apache.airavata.gfac.utils.GF
import org.apache.airavata.gfac.utils.InputStreamToFileWriter;
import org.apache.airavata.gfac.utils.InputUtils;
import org.apache.airavata.gfac.utils.OutputUtils;
+import org.apache.airavata.registry.api.workflow.ApplicationJob;
+import org.apache.airavata.registry.api.workflow.ApplicationJob.ApplicationJobStatus;
import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
import org.apache.airavata.schemas.gfac.NameValuePairType;
import org.apache.xmlbeans.XmlException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
public class LocalProvider implements GFacProvider {
private static final Logger log = LoggerFactory.getLogger(LocalProvider.class);
private ProcessBuilder builder;
private List<String> cmdList;
-
+ private String jobId;
+
+ public static class LocalProviderJobData{
+ private String applicationName;
+ private List<String> inputParameters;
+ private String workingDir;
+ private String inputDir;
+ private String outputDir;
+ public String getApplicationName() {
+ return applicationName;
+ }
+ public void setApplicationName(String applicationName) {
+ this.applicationName = applicationName;
+ }
+ public List<String> getInputParameters() {
+ return inputParameters;
+ }
+ public void setInputParameters(List<String> inputParameters) {
+ this.inputParameters = inputParameters;
+ }
+ public String getWorkingDir() {
+ return workingDir;
+ }
+ public void setWorkingDir(String workingDir) {
+ this.workingDir = workingDir;
+ }
+ public String getInputDir() {
+ return inputDir;
+ }
+ public void setInputDir(String inputDir) {
+ this.inputDir = inputDir;
+ }
+ public String getOutputDir() {
+ return outputDir;
+ }
+ public void setOutputDir(String outputDir) {
+ this.outputDir = outputDir;
+ }
+ }
public LocalProvider(){
cmdList = new ArrayList<String>();
}
@@ -82,7 +127,28 @@ public class LocalProvider implements GF
try {
// running cmd
Process process = builder.start();
-
+ jobId="Local_"+Calendar.getInstance().getTimeInMillis()+"_";
+ if(jobExecutionContext.getGFacConfiguration().getAiravataAPI() != null){
+ ApplicationJob appJob = GFacUtils.createApplicationJob(jobExecutionContext);
+ appJob.setJobId(jobId);
+ LocalProviderJobData data = new LocalProviderJobData();
+ data.setApplicationName(app.getExecutableLocation());
+ data.setInputDir(app.getInputDataDirectory());
+ data.setOutputDir(app.getOutputDataDirectory());
+ data.setWorkingDir(builder.directory().toString());
+ data.setInputParameters(ProviderUtils.getInputParameters(jobExecutionContext));
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ JAXB.marshal(data, stream);
+ appJob.setJobData(stream.toString());
+ appJob.setSubmittedTime(Calendar.getInstance().getTime());
+ appJob.setJobStatus(ApplicationJobStatus.SUBMITTED);
+ appJob.setStatusUpdateTime(appJob.getSubmittedTime());
+ try {
+ jobExecutionContext.getGFacConfiguration().getAiravataAPI().getProvenanceManager().addApplicationJob(appJob);
+ } catch (AiravataAPIInvocationException e) {
+ e.printStackTrace();
+ }
+ }
Thread standardOutWriter = new InputStreamToFileWriter(process.getInputStream(), app.getStandardOutput());
Thread standardErrorWriter = new InputStreamToFileWriter(process.getErrorStream(), app.getStandardError());
@@ -95,6 +161,14 @@ public class LocalProvider implements GF
// wait for the process (application) to finish executing
int returnValue = process.waitFor();
+ if(jobExecutionContext.getGFacConfiguration().getAiravataAPI() != null){
+ try {
+ jobExecutionContext.getGFacConfiguration().getAiravataAPI().getProvenanceManager().updateApplicationJobStatus(jobId, ApplicationJobStatus.FINALIZE, Calendar.getInstance().getTime());
+ } catch (AiravataAPIInvocationException e) {
+ e.printStackTrace();
+ }
+ }
+
// make sure other two threads are done
standardOutWriter.join();
standardErrorWriter.join();
@@ -104,8 +178,22 @@ public class LocalProvider implements GF
* just provide warning in the log messages
*/
if (returnValue != 0) {
+ if(jobExecutionContext.getGFacConfiguration().getAiravataAPI() != null){
+ try {
+ jobExecutionContext.getGFacConfiguration().getAiravataAPI().getProvenanceManager().updateApplicationJobStatus(jobId, ApplicationJobStatus.FAILED, Calendar.getInstance().getTime());
+ } catch (AiravataAPIInvocationException e) {
+ e.printStackTrace();
+ }
+ }
log.error("Process finished with non zero return value. Process may have failed");
} else {
+ if(jobExecutionContext.getGFacConfiguration().getAiravataAPI() != null){
+ try {
+ jobExecutionContext.getGFacConfiguration().getAiravataAPI().getProvenanceManager().updateApplicationJobStatus(jobId, ApplicationJobStatus.FINISHED, Calendar.getInstance().getTime());
+ } catch (AiravataAPIInvocationException e) {
+ e.printStackTrace();
+ }
+ }
log.info("Process finished with return value of zero.");
}
Modified: airavata/trunk/modules/integration-tests/src/test/java/org/apache/airavata/integration/BaseCaseIT.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/integration-tests/src/test/java/org/apache/airavata/integration/BaseCaseIT.java?rev=1490710&r1=1490709&r2=1490710&view=diff
==============================================================================
--- airavata/trunk/modules/integration-tests/src/test/java/org/apache/airavata/integration/BaseCaseIT.java (original)
+++ airavata/trunk/modules/integration-tests/src/test/java/org/apache/airavata/integration/BaseCaseIT.java Fri Jun 7 16:32:55 2013
@@ -45,6 +45,7 @@ import org.apache.airavata.commons.gfac.
import org.apache.airavata.commons.gfac.type.ServiceDescription;
import org.apache.airavata.registry.api.PasswordCallback;
import org.apache.airavata.registry.api.impl.WorkflowExecutionDataImpl;
+import org.apache.airavata.registry.api.workflow.ApplicationJob;
import org.apache.airavata.registry.api.workflow.ExperimentData;
import org.apache.airavata.registry.api.workflow.InputData;
import org.apache.airavata.registry.api.workflow.NodeExecutionData;
@@ -424,9 +425,12 @@ public class BaseCaseIT {
airavataAPI.getExecutionManager().waitForExperimentTermination(experimentId);
log.info("Run workflow completed ....");
- log.info("Starting monitoring ....");
verifyOutput(experimentId, "echo_output=Airavata_Test");
+
+ log.info("Verifying application jobs ....");
+ List<ApplicationJob> applicationJobs = airavataAPI.getProvenanceManager().getApplicationJobs(experimentId, null, null);
+ Assert.assertEquals(applicationJobs.size(), 1);
}
protected void verifyOutput(String experimentId, String outputVerifyingString) throws Exception {