You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2013/02/01 22:59:15 UTC
svn commit: r1441646 - in
/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac:
context/MessageContext.java provider/GramProvider.java
utils/GramProviderUtils.java utils/OutputUtils.java
Author: lahiru
Date: Fri Feb 1 21:59:14 2013
New Revision: 1441646
URL: http://svn.apache.org/viewvc?rev=1441646&view=rev
Log:
adding file staging in to new gfac architecture.
Modified:
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/MessageContext.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/OutputUtils.java
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/MessageContext.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/MessageContext.java?rev=1441646&r1=1441645&r2=1441646&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/MessageContext.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/MessageContext.java Fri Feb 1 21:59:14 2013
@@ -22,6 +22,7 @@
package org.apache.airavata.gfac.context;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
public class MessageContext extends AbstractContext {
@@ -33,7 +34,7 @@ public class MessageContext extends Abst
}
public MessageContext(){
- this.parameters = new HashMap<String, Object>();
+ this.parameters = new LinkedHashMap<String, Object>();
}
public Object getParameter(String parameterName) {
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java?rev=1441646&r1=1441645&r2=1441646&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java Fri Feb 1 21:59:14 2013
@@ -44,6 +44,9 @@ public class GramProvider implements GFa
// This method precpare the environment before the application invocation.
public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException {
GramProviderUtils.makeDirectory(jobExecutionContext);
+ //Note this step has to be done before setupEnvironment,otherwise input file path adjusting based on the
+ //application hosted machien will not reflect in the RSL
+ GramProviderUtils.processInput(jobExecutionContext);
job = GramProviderUtils.setupEnvironment(jobExecutionContext);
listener = new GramJobSubmissionListener(job, jobExecutionContext);
job.addListener(listener);
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java?rev=1441646&r1=1441645&r2=1441646&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java Fri Feb 1 21:59:14 2013
@@ -21,13 +21,15 @@
package org.apache.airavata.gfac.utils;
import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
import org.apache.airavata.gfac.ToolsException;
import org.apache.airavata.gfac.context.GSISecurityContext;
import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
import org.apache.airavata.gfac.external.GridFtp;
+import org.apache.airavata.gfac.notification.events.ExecutionFailEvent;
import org.apache.airavata.gfac.provider.GFacProviderException;
-import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
-import org.apache.airavata.schemas.gfac.GlobusHostType;
+import org.apache.airavata.schemas.gfac.*;
import org.apache.xmlbeans.XmlException;
import org.globus.gram.GramAttributes;
import org.globus.gram.GramJob;
@@ -38,9 +40,7 @@ import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
public class GramProviderUtils {
private static final Logger log = LoggerFactory.getLogger(GramJobSubmissionListener.class);
@@ -114,13 +114,14 @@ public class GramProviderUtils {
ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
GridFtp ftp = new GridFtp();
File localStdErrFile = null;
+ Map<String, ActualParameter> stringMap = null;
try {
GSISecurityContext gssContext = new GSISecurityContext(jobExecutionContext.getGFacConfiguration());
GSSCredential gssCred = gssContext.getGssCredentails();
String[] hostgridFTP = host.getGridFTPEndPointArray();
if (hostgridFTP == null || hostgridFTP.length == 0) {
- hostgridFTP = new String[] { host.getHostAddress() };
+ hostgridFTP = new String[]{host.getHostAddress()};
}
GFacProviderException pe = null;
for (String endpoint : host.getGridFTPEndPointArray()) {
@@ -146,84 +147,54 @@ public class GramProviderUtils {
String stdout = ftp.readRemoteFile(stdoutURI, gssCred, localStdOutFile);
String stderr = ftp.readRemoteFile(stderrURI, gssCred, localStdErrFile);
- return OutputUtils.fillOutputFromStdout(jobExecutionContext, stdout, stderr);
-// Map<String,ActualParameter> stringMap = null;
-// MessageContext<Object> output = jobExecutionContext.getOutput();
-// for (Iterator<String> iterator = output.getNames(); iterator.hasNext(); ) {
-// String paramName = iterator.next();
-// ActualParameter actualParameter = (ActualParameter) output.getValue(paramName);
-// if ("URIArray".equals(actualParameter.getType().getType().toString())) {
-// URI outputURI = GfacUtils.createGsiftpURI(endpoint,app.getOutputDataDirectory());
-// List<String> outputList = ftp.listDir(outputURI,gssCred);
-// String[] valueList = outputList.toArray(new String[outputList.size()]);
-// ((URIArrayType) actualParameter.getType()).setValueArray(valueList);
-// stringMap = new HashMap<String, ActualParameter>();
-// stringMap.put(paramName, actualParameter);
-// jobExecutionContext.getExecutionContext().getNotifier().output(jobExecutionContext, actualParameter.toString());
-// }
-// if ("StringArray".equals(actualParameter.getType().getType().toString())) {
-// String[] valueList = OutputUtils.parseStdoutArray(stdout, paramName);
-// ((StringArrayType) actualParameter.getType()).setValueArray(valueList);
-// stringMap = new HashMap<String, ActualParameter>();
-// stringMap.put(paramName, actualParameter);
-// jobExecutionContext.getExecutionContext().getNotifier().output(jobExecutionContext, actualParameter.toString());
-// }
-// else{
-// // This is to handle exception during the output parsing.
-// stringMap = OutputUtils.fillOutputFromStdout(jobExecutionContext.<ActualParameter>getOutput(), stdout,stderr);
-// String paramValue = output.getStringValue(paramName);
-// if(paramValue == null || paramValue.isEmpty()){
-// int errCode = listener.getError();
-// String errorMsg = "Job " + job.getID() + " on host " + host.getHostAddress();
-// JobSubmissionFault error = new JobSubmissionFault(this, new Exception(errorMsg), "GFAC HOST",
-// gateKeeper, job.getRSL(), jobExecutionContext);
-// errorReason(errCode, error);
-// jobExecutionContext.getExecutionContext().getNotifier().executionFail(jobExecutionContext,error,
-// readLastLinesofStdOut(localStdErrFile.getPath(), 20));
-// throw error;
-// }
-// }
-// }
-// if(stringMap == null || stringMap.isEmpty()){
+ Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
+ Set<String> keys = output.keySet();
+ for (String paramName : keys) {
+ ActualParameter actualParameter = (ActualParameter) output.get(paramName);
+ if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+ URI outputURI = GFacUtils.createGsiftpURI(endpoint, app.getOutputDataDirectory());
+ List<String> outputList = ftp.listDir(outputURI, gssCred);
+ String[] valueList = outputList.toArray(new String[outputList.size()]);
+ ((URIArrayType) actualParameter.getType()).setValueArray(valueList);
+ stringMap = new HashMap<String, ActualParameter>();
+ stringMap.put(paramName, actualParameter);
+ }
+ if ("StringArray".equals(actualParameter.getType().getType().toString())) {
+ String[] valueList = OutputUtils.parseStdoutArray(stdout, paramName);
+ ((StringArrayType) actualParameter.getType()).setValueArray(valueList);
+ stringMap = new HashMap<String, ActualParameter>();
+ stringMap.put(paramName, actualParameter);
+ } else {
+ // This is to handle exception during the output parsing.
+ stringMap = OutputUtils.fillOutputFromStdout(jobExecutionContext, stdout, stderr);
+ }
+ }
+ if (stringMap == null || stringMap.isEmpty()) {
+ jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(new Throwable("Empty Output returned from the Application, Double check the application" +
+ "and ApplicationDescriptor output Parameter Names")));
// GFacProviderException exception = new GFacProviderException("Gram provider: Error creating job output", jobExecutionContext);
// jobExecutionContext.getExecutionContext().getNotifier().executionFail(jobExecutionContext,exception,exception.getLocalizedMessage());
// throw exception;
-// }
-// // If users has given an output DAta poth we download the output files in to that directory, this will be apath in the machine where GFac is installed
-// if(WorkflowContextHeaderBuilder.getCurrentContextHeader() != null &&
-// WorkflowContextHeaderBuilder.getCurrentContextHeader().getWorkflowOutputDataHandling() != null){
-// WorkflowOutputDataHandlingDocument.WorkflowOutputDataHandling workflowOutputDataHandling =
-// WorkflowContextHeaderBuilder.getCurrentContextHeader().getWorkflowOutputDataHandling();
-// if(workflowOutputDataHandling.getApplicationOutputDataHandlingArray().length != 0){
-// String outputDataDirectory = workflowOutputDataHandling.getApplicationOutputDataHandlingArray()[0].getOutputDataDirectory();
-// if(outputDataDirectory != null && !"".equals(outputDataDirectory)){
-// stageOutputFiles(jobExecutionContext,outputDataDirectory);
-// }
-// }
-// }
-// return stringMap;
-// }catch (XmlException e) {
-// throw new GFacProviderException(e.getMessage(),jobExecutionContext, e,readLastLinesofStdOut(localStdErrFile.getPath(), 20));
-// }
- }catch (ToolsException e) {
- throw new GFacProviderException(e.getMessage(),jobExecutionContext, e,readLastLinesofStdOut(localStdErrFile.getPath(), 20));
+ }
+ //todo check the workflow context header and run the stateOutputFiles method to stage the output files in to a user defined location
+ } catch (ToolsException e) {
+ throw new GFacProviderException(e.getMessage(), jobExecutionContext, e, readLastLinesofStdOut(localStdErrFile.getPath(), 20));
} catch (URISyntaxException e) {
throw new GFacProviderException("URI is malformatted:" + e.getMessage(), jobExecutionContext, e, readLastLinesofStdOut(localStdErrFile.getPath(), 20));
- }catch (NullPointerException e) {
+ } catch (NullPointerException e) {
throw new GFacProviderException("Output is not produced in stdout:" + e.getMessage(), jobExecutionContext, e, readLastLinesofStdOut(localStdErrFile.getPath(), 20));
}
}
- //todo this return has to be removed
- return null;
-
+ //todo this return has to be removed
+ return stringMap;
} catch (Exception e) {
// jobExecutionContext.getExecutionContext().getNotifier().executionFail(jobExecutionContext,e,readLastLinesofStdOut(localStdErrFile.getPath(), 20));
- throw new GFacProviderException(e.getMessage(), jobExecutionContext,e, readLastLinesofStdOut(localStdErrFile.getPath(), 20));
+ throw new GFacProviderException(e.getMessage(), jobExecutionContext, e, readLastLinesofStdOut(localStdErrFile.getPath(), 20));
}
}
- private static String readLastLinesofStdOut(String path, int count) {
+ private static String readLastLinesofStdOut(String path, int count) {
StringBuffer buffer = new StringBuffer();
FileInputStream in = null;
try {
@@ -244,12 +215,12 @@ public class GramProviderUtils {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
if (numberofLines > count) {
- for (int i = numberofLines - count; i < numberofLines; i++) {
+ for (int i = numberofLines - count; i < numberofLines; i++) {
buffer.append(strLine.get(i));
buffer.append("\n");
}
- }else{
- for (int i = 0; i < numberofLines; i++) {
+ } else {
+ for (int i = 0; i < numberofLines; i++) {
buffer.append(strLine.get(i));
buffer.append("\n");
}
@@ -261,4 +232,126 @@ public class GramProviderUtils {
}
return buffer.toString();
}
+
+ private static void stageOutputFiles(JobExecutionContext jobExecutionContext, String outputFileStagingPath) throws GFacProviderException {
+ MessageContext outputNew = new MessageContext();
+ MessageContext output = jobExecutionContext.getOutMessageContext();
+ Map<String, Object> parameters = output.getParameters();
+ for (String paramName : parameters.keySet()) {
+ ActualParameter actualParameter = (ActualParameter) parameters
+ .get(paramName);
+ //TODO: Review this with type
+ GlobusHostType host = (GlobusHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType();
+ GridFtp ftp = new GridFtp();
+
+ GSISecurityContext gssContext = new GSISecurityContext(jobExecutionContext.getGFacConfiguration());
+ GSSCredential gssCred = null;
+ try {
+ gssCred = gssContext.getGssCredentails();
+ } catch (SecurityException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ try {
+ if ("URI".equals(actualParameter.getType().getType().toString())) {
+ for (String endpoint : host.getGridFTPEndPointArray()) {
+ ((URIParameterType) actualParameter.getType()).setValue(doStaging(outputFileStagingPath,
+ MappingFactory.toString(actualParameter), ftp, gssCred, endpoint));
+ }
+ } else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+ List<String> split = Arrays.asList(MappingFactory.toString(actualParameter).split(","));
+ List<String> newFiles = new ArrayList<String>();
+ for (String endpoint : host.getGridFTPEndPointArray()) {
+ for (String paramValueEach : split) {
+ newFiles.add(doStaging(outputFileStagingPath, paramValueEach, ftp, gssCred, endpoint));
+ }
+ ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
+ }
+
+ }
+ } catch (URISyntaxException e) {
+ throw new GFacProviderException(e.getMessage(), e, jobExecutionContext);
+ } catch (ToolsException e) {
+ throw new GFacProviderException(e.getMessage(), e, jobExecutionContext);
+ }
+ outputNew.getParameters().put(paramName, actualParameter);
+ }
+ jobExecutionContext.setOutMessageContext(outputNew);
+ }
+
+ private static String doStaging(String outputFileStagingPath, String paramValue, GridFtp ftp, GSSCredential gssCred, String endpoint) throws URISyntaxException, ToolsException {
+ URI srcURI = GFacUtils.createGsiftpURI(endpoint, paramValue);
+ String fileName = new File(srcURI.getPath()).getName();
+ File outputFile = new File(outputFileStagingPath + File.separator + fileName);
+ ftp.readRemoteFile(srcURI,
+ gssCred, outputFile);
+ return outputFileStagingPath + File.separator + fileName;
+ }
+
+ private static String stageInputFiles(JobExecutionContext jobExecutionContext, String paramValue) throws URISyntaxException, SecurityException, ToolsException, IOException {
+ URI gridftpURL;
+ gridftpURL = new URI(paramValue);
+ GlobusHostType host = (GlobusHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType();
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+ GridFtp ftp = new GridFtp();
+ URI destURI = null;
+ GSISecurityContext gssContext = new GSISecurityContext(jobExecutionContext.getGFacConfiguration());
+ GSSCredential gssCred = gssContext.getGssCredentails();
+
+ for (String endpoint : host.getGridFTPEndPointArray()) {
+ URI inputURI = GFacUtils.createGsiftpURI(endpoint, app.getInputDataDirectory());
+ String fileName = new File(gridftpURL.getPath()).getName();
+ String s = inputURI.getPath() + File.separator + fileName;
+ //if user give a url just to refer an endpoint, not a web resource we are not doing any transfer
+ if (fileName != null && !"".equals(fileName)) {
+ destURI = GFacUtils.createGsiftpURI(endpoint, s);
+ if (paramValue.startsWith("gsiftp")) {
+ ftp.uploadFile(gridftpURL, destURI, gssCred);
+ } else if (paramValue.startsWith("file")) {
+ String localFile = paramValue.substring(paramValue.indexOf(":") + 1, paramValue.length());
+ ftp.uploadFile(destURI, gssCred, new FileInputStream(localFile));
+ } else if (paramValue.startsWith("http")) {
+ ftp.uploadFile(destURI,
+ gssCred, (gridftpURL.toURL().openStream()));
+ } else {
+ //todo throw exception telling unsupported protocol
+ return paramValue;
+ }
+ } else {
+ // When the given input is not a web resource but a URI type input, then we don't do any transfer just keep the same value as it isin the input
+ return paramValue;
+ }
+ }
+ System.out.println(destURI.getPath());
+ return destURI.getPath();
+ }
+
+ public static Map<String, ?> processInput(JobExecutionContext jobExecutionContext)
+ throws GFacProviderException {
+ MessageContext inputNew = new MessageContext();
+ try {
+ MessageContext input = jobExecutionContext.getInMessageContext();
+ Set<String> parameters = input.getParameters().keySet();
+ for (String paramName:parameters) {
+ ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName);
+ String paramValue = MappingFactory.toString(actualParameter);
+ //TODO: Review this with type
+ if ("URI".equals(actualParameter.getType().getType().toString())) {
+ ((URIParameterType) actualParameter.getType()).setValue(stageInputFiles(jobExecutionContext, paramValue));
+ } else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+ List<String> split = Arrays.asList(paramValue.split(","));
+ List<String> newFiles = new ArrayList<String>();
+ for (String paramValueEach : split) {
+ newFiles.add(stageInputFiles(jobExecutionContext, paramValueEach));
+ }
+ ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
+ }
+ inputNew.getParameters().put(paramName, actualParameter);
+ }
+ } catch (Exception e) {
+// jobExecutionContext.getExecutionContext().getNotifier().executionFail(jobExecutionContext,e,"Error during Input File staging");
+ throw new GFacProviderException("Error while input File Staging", jobExecutionContext, e, e.getLocalizedMessage());
+ }
+ jobExecutionContext.setInMessageContext(inputNew);
+ return null;
+ }
}
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/OutputUtils.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/OutputUtils.java?rev=1441646&r1=1441645&r2=1441646&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/OutputUtils.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/OutputUtils.java Fri Feb 1 21:59:14 2013
@@ -29,14 +29,12 @@ import org.apache.airavata.schemas.gfac.
import org.apache.airavata.schemas.gfac.StdOutParameterType;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class OutputUtils {
- private OutputUtils() {
- }
+ private static String regexPattern = "\\s*=\\s*([^\\[\\s'\"][^\\s]*|\"[^\"]*\"|'[^']*'|\\[[^\\[]*\\])";
public static Map<String, ActualParameter> fillOutputFromStdout(JobExecutionContext context, String stdout, String stderr) throws Exception {
@@ -65,11 +63,12 @@ public class OutputUtils {
}
}
}
+
return result;
}
private static String parseStdout(String stdout, String outParam) throws Exception {
- String regex = Pattern.quote(outParam) + "\\s*=\\s*([^\\[\\s'\"][^\\s]*|\"[^\"]*\"|'[^']*'|\\[[^\\[]*\\])";
+ String regex = Pattern.quote(outParam) + regexPattern;
String match = null;
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(stdout);
@@ -83,4 +82,19 @@ public class OutputUtils {
throw new Exception("Data for the output parameter '" + outParam + "' was not found");
}
}
+
+ public static String[] parseStdoutArray(String stdout, String outParam) throws Exception {
+ String regex = Pattern.quote(outParam) + regexPattern;
+ StringBuffer match = new StringBuffer();
+ Pattern pattern = Pattern.compile(regex);
+ Matcher matcher = pattern.matcher(stdout);
+ while (matcher.find()) {
+ match.append(matcher.group(1) + ",");
+ }
+ if (match != null) {
+ return match.toString().split(",");
+ } else {
+ throw new Exception("Data for the output parameter '" + outParam + "' was not found");
+ }
+ }
}