You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2013/02/01 22:59:15 UTC

svn commit: r1441646 - in /airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac: context/MessageContext.java provider/GramProvider.java utils/GramProviderUtils.java utils/OutputUtils.java

Author: lahiru
Date: Fri Feb  1 21:59:14 2013
New Revision: 1441646

URL: http://svn.apache.org/viewvc?rev=1441646&view=rev
Log:
adding file staging in to new gfac architecture.

Modified:
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/MessageContext.java
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/OutputUtils.java

Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/MessageContext.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/MessageContext.java?rev=1441646&r1=1441645&r2=1441646&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/MessageContext.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/MessageContext.java Fri Feb  1 21:59:14 2013
@@ -22,6 +22,7 @@
 package org.apache.airavata.gfac.context;
 
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 public class MessageContext extends AbstractContext {
@@ -33,7 +34,7 @@ public class MessageContext extends Abst
     }
 
     public MessageContext(){
-        this.parameters = new HashMap<String, Object>();
+        this.parameters = new LinkedHashMap<String, Object>();
     }
 
     public Object getParameter(String parameterName) {

Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java?rev=1441646&r1=1441645&r2=1441646&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java Fri Feb  1 21:59:14 2013
@@ -44,6 +44,9 @@ public class GramProvider implements GFa
     // This method precpare the environment before the application invocation.
     public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException {
         GramProviderUtils.makeDirectory(jobExecutionContext);
+        //Note this step has to be done before setupEnvironment,otherwise input file path adjusting based on the
+        //application hosted machien will not reflect in the RSL
+        GramProviderUtils.processInput(jobExecutionContext);
         job = GramProviderUtils.setupEnvironment(jobExecutionContext);
         listener = new GramJobSubmissionListener(job, jobExecutionContext);
         job.addListener(listener);

Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java?rev=1441646&r1=1441645&r2=1441646&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java Fri Feb  1 21:59:14 2013
@@ -21,13 +21,15 @@
 package org.apache.airavata.gfac.utils;
 
 import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
 import org.apache.airavata.gfac.ToolsException;
 import org.apache.airavata.gfac.context.GSISecurityContext;
 import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
 import org.apache.airavata.gfac.external.GridFtp;
+import org.apache.airavata.gfac.notification.events.ExecutionFailEvent;
 import org.apache.airavata.gfac.provider.GFacProviderException;
-import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
-import org.apache.airavata.schemas.gfac.GlobusHostType;
+import org.apache.airavata.schemas.gfac.*;
 import org.apache.xmlbeans.XmlException;
 import org.globus.gram.GramAttributes;
 import org.globus.gram.GramJob;
@@ -38,9 +40,7 @@ import org.slf4j.LoggerFactory;
 import java.io.*;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 public class GramProviderUtils {
     private static final Logger log = LoggerFactory.getLogger(GramJobSubmissionListener.class);
@@ -114,13 +114,14 @@ public class GramProviderUtils {
         ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
         GridFtp ftp = new GridFtp();
         File localStdErrFile = null;
+        Map<String, ActualParameter> stringMap = null;
         try {
             GSISecurityContext gssContext = new GSISecurityContext(jobExecutionContext.getGFacConfiguration());
             GSSCredential gssCred = gssContext.getGssCredentails();
 
             String[] hostgridFTP = host.getGridFTPEndPointArray();
             if (hostgridFTP == null || hostgridFTP.length == 0) {
-                hostgridFTP = new String[] { host.getHostAddress() };
+                hostgridFTP = new String[]{host.getHostAddress()};
             }
             GFacProviderException pe = null;
             for (String endpoint : host.getGridFTPEndPointArray()) {
@@ -146,84 +147,54 @@ public class GramProviderUtils {
 
                     String stdout = ftp.readRemoteFile(stdoutURI, gssCred, localStdOutFile);
                     String stderr = ftp.readRemoteFile(stderrURI, gssCred, localStdErrFile);
-                    return OutputUtils.fillOutputFromStdout(jobExecutionContext, stdout, stderr);
-//                    Map<String,ActualParameter> stringMap = null;
-//                    MessageContext<Object> output = jobExecutionContext.getOutput();
-//                    for (Iterator<String> iterator = output.getNames(); iterator.hasNext(); ) {
-//                        String paramName = iterator.next();
-//                        ActualParameter actualParameter = (ActualParameter) output.getValue(paramName);
-//						if ("URIArray".equals(actualParameter.getType().getType().toString())) {
-//							URI outputURI = GfacUtils.createGsiftpURI(endpoint,app.getOutputDataDirectory());
-//							List<String> outputList = ftp.listDir(outputURI,gssCred);
-//							String[] valueList = outputList.toArray(new String[outputList.size()]);
-//							((URIArrayType) actualParameter.getType()).setValueArray(valueList);
-//							stringMap = new HashMap<String, ActualParameter>();
-//							stringMap.put(paramName, actualParameter);
-//							jobExecutionContext.getExecutionContext().getNotifier().output(jobExecutionContext, actualParameter.toString());
-//						}
-//                        if ("StringArray".equals(actualParameter.getType().getType().toString())) {
-//                            String[] valueList = OutputUtils.parseStdoutArray(stdout, paramName);
-//                            ((StringArrayType) actualParameter.getType()).setValueArray(valueList);
-//                            stringMap = new HashMap<String, ActualParameter>();
-//                            stringMap.put(paramName, actualParameter);
-//                            jobExecutionContext.getExecutionContext().getNotifier().output(jobExecutionContext, actualParameter.toString());
-//                        }
-//                    	else{
-//                    	// This is to handle exception during the output parsing.
-//                        stringMap = OutputUtils.fillOutputFromStdout(jobExecutionContext.<ActualParameter>getOutput(), stdout,stderr);
-//                        String paramValue = output.getStringValue(paramName);
-//                        if(paramValue == null || paramValue.isEmpty()){
-//                            int errCode = listener.getError();
-//                            String errorMsg = "Job " + job.getID() + " on host " + host.getHostAddress();
-//                            JobSubmissionFault error = new JobSubmissionFault(this, new Exception(errorMsg), "GFAC HOST",
-//                                    gateKeeper, job.getRSL(), jobExecutionContext);
-//                            errorReason(errCode, error);
-//                            jobExecutionContext.getExecutionContext().getNotifier().executionFail(jobExecutionContext,error,
-//                                    readLastLinesofStdOut(localStdErrFile.getPath(), 20));
-//                            throw error;
-//                        }
-//                        }
-//                    }
-//                    if(stringMap == null || stringMap.isEmpty()){
+                    Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
+                    Set<String> keys = output.keySet();
+                    for (String paramName : keys) {
+                        ActualParameter actualParameter = (ActualParameter) output.get(paramName);
+                        if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+                            URI outputURI = GFacUtils.createGsiftpURI(endpoint, app.getOutputDataDirectory());
+                            List<String> outputList = ftp.listDir(outputURI, gssCred);
+                            String[] valueList = outputList.toArray(new String[outputList.size()]);
+                            ((URIArrayType) actualParameter.getType()).setValueArray(valueList);
+                            stringMap = new HashMap<String, ActualParameter>();
+                            stringMap.put(paramName, actualParameter);
+                        }
+                        if ("StringArray".equals(actualParameter.getType().getType().toString())) {
+                            String[] valueList = OutputUtils.parseStdoutArray(stdout, paramName);
+                            ((StringArrayType) actualParameter.getType()).setValueArray(valueList);
+                            stringMap = new HashMap<String, ActualParameter>();
+                            stringMap.put(paramName, actualParameter);
+                        } else {
+                            // This is to handle exception during the output parsing.
+                            stringMap = OutputUtils.fillOutputFromStdout(jobExecutionContext, stdout, stderr);
+                        }
+                    }
+                    if (stringMap == null || stringMap.isEmpty()) {
+                        jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(new Throwable("Empty Output returned from the Application, Double check the application" +
+                                "and ApplicationDescriptor output Parameter Names")));
 //                    	GFacProviderException exception = new GFacProviderException("Gram provider: Error creating job output", jobExecutionContext);
 //                    	 jobExecutionContext.getExecutionContext().getNotifier().executionFail(jobExecutionContext,exception,exception.getLocalizedMessage());
 //                         throw exception;
-//                    }
-//                    // If users has given an output DAta poth we download the output files in to that directory, this will be apath in the machine where GFac is installed
-//                    if(WorkflowContextHeaderBuilder.getCurrentContextHeader() != null &&
-//                            WorkflowContextHeaderBuilder.getCurrentContextHeader().getWorkflowOutputDataHandling() != null){
-//                        WorkflowOutputDataHandlingDocument.WorkflowOutputDataHandling workflowOutputDataHandling =
-//                                WorkflowContextHeaderBuilder.getCurrentContextHeader().getWorkflowOutputDataHandling();
-//                        if(workflowOutputDataHandling.getApplicationOutputDataHandlingArray().length != 0){
-//                            String outputDataDirectory = workflowOutputDataHandling.getApplicationOutputDataHandlingArray()[0].getOutputDataDirectory();
-//                            if(outputDataDirectory != null && !"".equals(outputDataDirectory)){
-//                                stageOutputFiles(jobExecutionContext,outputDataDirectory);
-//                            }
-//                        }
-//                    }
-//                    return stringMap;
-//                }catch (XmlException e) {
-//                    throw new GFacProviderException(e.getMessage(),jobExecutionContext, e,readLastLinesofStdOut(localStdErrFile.getPath(), 20));
-//                }
-                }catch (ToolsException e) {
-                    throw new GFacProviderException(e.getMessage(),jobExecutionContext, e,readLastLinesofStdOut(localStdErrFile.getPath(), 20));
+                    }
+                    //todo check the workflow context header and run the stateOutputFiles method to stage the output files in to a user defined location
+                } catch (ToolsException e) {
+                    throw new GFacProviderException(e.getMessage(), jobExecutionContext, e, readLastLinesofStdOut(localStdErrFile.getPath(), 20));
                 } catch (URISyntaxException e) {
                     throw new GFacProviderException("URI is malformatted:" + e.getMessage(), jobExecutionContext, e, readLastLinesofStdOut(localStdErrFile.getPath(), 20));
-                }catch (NullPointerException e) {
+                } catch (NullPointerException e) {
                     throw new GFacProviderException("Output is not produced in stdout:" + e.getMessage(), jobExecutionContext, e, readLastLinesofStdOut(localStdErrFile.getPath(), 20));
                 }
             }
-                          //todo this return has to be removed
-              return null;
-
+            //todo this return has to be removed
+            return stringMap;
         } catch (Exception e) {
 //            jobExecutionContext.getExecutionContext().getNotifier().executionFail(jobExecutionContext,e,readLastLinesofStdOut(localStdErrFile.getPath(), 20));
-            throw new GFacProviderException(e.getMessage(), jobExecutionContext,e, readLastLinesofStdOut(localStdErrFile.getPath(), 20));
+            throw new GFacProviderException(e.getMessage(), jobExecutionContext, e, readLastLinesofStdOut(localStdErrFile.getPath(), 20));
         }
 
     }
 
-     private static String readLastLinesofStdOut(String path, int count) {
+    private static String readLastLinesofStdOut(String path, int count) {
         StringBuffer buffer = new StringBuffer();
         FileInputStream in = null;
         try {
@@ -244,12 +215,12 @@ public class GramProviderUtils {
             e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
         }
         if (numberofLines > count) {
-             for (int i = numberofLines - count; i < numberofLines; i++) {
+            for (int i = numberofLines - count; i < numberofLines; i++) {
                 buffer.append(strLine.get(i));
                 buffer.append("\n");
             }
-        }else{
-             for (int i = 0; i < numberofLines; i++) {
+        } else {
+            for (int i = 0; i < numberofLines; i++) {
                 buffer.append(strLine.get(i));
                 buffer.append("\n");
             }
@@ -261,4 +232,126 @@ public class GramProviderUtils {
         }
         return buffer.toString();
     }
+
+    private static void stageOutputFiles(JobExecutionContext jobExecutionContext, String outputFileStagingPath) throws GFacProviderException {
+        MessageContext outputNew = new MessageContext();
+        MessageContext output = jobExecutionContext.getOutMessageContext();
+        Map<String, Object> parameters = output.getParameters();
+        for (String paramName : parameters.keySet()) {
+            ActualParameter actualParameter = (ActualParameter) parameters
+                    .get(paramName);
+            //TODO: Review this with type
+            GlobusHostType host = (GlobusHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType();
+            GridFtp ftp = new GridFtp();
+
+            GSISecurityContext gssContext = new GSISecurityContext(jobExecutionContext.getGFacConfiguration());
+            GSSCredential gssCred = null;
+            try {
+                gssCred = gssContext.getGssCredentails();
+            } catch (SecurityException e) {
+                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            }
+            try {
+                if ("URI".equals(actualParameter.getType().getType().toString())) {
+                    for (String endpoint : host.getGridFTPEndPointArray()) {
+                        ((URIParameterType) actualParameter.getType()).setValue(doStaging(outputFileStagingPath,
+                                MappingFactory.toString(actualParameter), ftp, gssCred, endpoint));
+                    }
+                } else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+                    List<String> split = Arrays.asList(MappingFactory.toString(actualParameter).split(","));
+                    List<String> newFiles = new ArrayList<String>();
+                    for (String endpoint : host.getGridFTPEndPointArray()) {
+                        for (String paramValueEach : split) {
+                            newFiles.add(doStaging(outputFileStagingPath, paramValueEach, ftp, gssCred, endpoint));
+                        }
+                        ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
+                    }
+
+                }
+            } catch (URISyntaxException e) {
+                throw new GFacProviderException(e.getMessage(), e, jobExecutionContext);
+            } catch (ToolsException e) {
+                throw new GFacProviderException(e.getMessage(), e, jobExecutionContext);
+            }
+            outputNew.getParameters().put(paramName, actualParameter);
+        }
+        jobExecutionContext.setOutMessageContext(outputNew);
+    }
+
+    private static String doStaging(String outputFileStagingPath, String paramValue, GridFtp ftp, GSSCredential gssCred, String endpoint) throws URISyntaxException, ToolsException {
+        URI srcURI = GFacUtils.createGsiftpURI(endpoint, paramValue);
+        String fileName = new File(srcURI.getPath()).getName();
+        File outputFile = new File(outputFileStagingPath + File.separator + fileName);
+        ftp.readRemoteFile(srcURI,
+                gssCred, outputFile);
+        return outputFileStagingPath + File.separator + fileName;
+    }
+
+    private static String stageInputFiles(JobExecutionContext jobExecutionContext, String paramValue) throws URISyntaxException, SecurityException, ToolsException, IOException {
+        URI gridftpURL;
+        gridftpURL = new URI(paramValue);
+        GlobusHostType host = (GlobusHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType();
+        ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+        GridFtp ftp = new GridFtp();
+        URI destURI = null;
+        GSISecurityContext gssContext = new GSISecurityContext(jobExecutionContext.getGFacConfiguration());
+        GSSCredential gssCred = gssContext.getGssCredentails();
+
+        for (String endpoint : host.getGridFTPEndPointArray()) {
+            URI inputURI = GFacUtils.createGsiftpURI(endpoint, app.getInputDataDirectory());
+            String fileName = new File(gridftpURL.getPath()).getName();
+            String s = inputURI.getPath() + File.separator + fileName;
+            //if user give a url just to refer an endpoint, not a web resource we are not doing any transfer
+            if (fileName != null && !"".equals(fileName)) {
+                destURI = GFacUtils.createGsiftpURI(endpoint, s);
+                if (paramValue.startsWith("gsiftp")) {
+                    ftp.uploadFile(gridftpURL, destURI, gssCred);
+                } else if (paramValue.startsWith("file")) {
+                    String localFile = paramValue.substring(paramValue.indexOf(":") + 1, paramValue.length());
+                    ftp.uploadFile(destURI, gssCred, new FileInputStream(localFile));
+                } else if (paramValue.startsWith("http")) {
+                    ftp.uploadFile(destURI,
+                            gssCred, (gridftpURL.toURL().openStream()));
+                } else {
+                    //todo throw exception telling unsupported protocol
+                    return paramValue;
+                }
+            } else {
+                // When the given input is not a web resource but a URI type input, then we don't do any transfer just keep the same value as it isin the input
+                return paramValue;
+            }
+        }
+        System.out.println(destURI.getPath());
+        return destURI.getPath();
+    }
+
+    public static Map<String, ?> processInput(JobExecutionContext jobExecutionContext)
+            throws GFacProviderException {
+        MessageContext inputNew = new MessageContext();
+        try {
+            MessageContext input = jobExecutionContext.getInMessageContext();
+            Set<String> parameters = input.getParameters().keySet();
+            for (String paramName:parameters) {
+                ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName);
+                String paramValue = MappingFactory.toString(actualParameter);
+                //TODO: Review this with type
+                if ("URI".equals(actualParameter.getType().getType().toString())) {
+                    ((URIParameterType) actualParameter.getType()).setValue(stageInputFiles(jobExecutionContext, paramValue));
+                } else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+                    List<String> split = Arrays.asList(paramValue.split(","));
+                    List<String> newFiles = new ArrayList<String>();
+                    for (String paramValueEach : split) {
+                        newFiles.add(stageInputFiles(jobExecutionContext, paramValueEach));
+                    }
+                    ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
+                }
+                inputNew.getParameters().put(paramName, actualParameter);
+            }
+        } catch (Exception e) {
+//           jobExecutionContext.getExecutionContext().getNotifier().executionFail(jobExecutionContext,e,"Error during Input File staging");
+            throw new GFacProviderException("Error while input File Staging", jobExecutionContext, e, e.getLocalizedMessage());
+        }
+        jobExecutionContext.setInMessageContext(inputNew);
+        return null;
+    }
 }

Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/OutputUtils.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/OutputUtils.java?rev=1441646&r1=1441645&r2=1441646&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/OutputUtils.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/OutputUtils.java Fri Feb  1 21:59:14 2013
@@ -29,14 +29,12 @@ import org.apache.airavata.schemas.gfac.
 import org.apache.airavata.schemas.gfac.StdOutParameterType;
 
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 public class OutputUtils {
-    private OutputUtils() {
-    }
+    private static String regexPattern = "\\s*=\\s*([^\\[\\s'\"][^\\s]*|\"[^\"]*\"|'[^']*'|\\[[^\\[]*\\])";
 
     public static Map<String, ActualParameter> fillOutputFromStdout(JobExecutionContext context, String stdout, String stderr) throws Exception {
 
@@ -65,11 +63,12 @@ public class OutputUtils {
                 }
             }
         }
+
         return result;
     }
 
     private static String parseStdout(String stdout, String outParam) throws Exception {
-        String regex = Pattern.quote(outParam) + "\\s*=\\s*([^\\[\\s'\"][^\\s]*|\"[^\"]*\"|'[^']*'|\\[[^\\[]*\\])";
+        String regex = Pattern.quote(outParam) + regexPattern;
         String match = null;
         Pattern pattern = Pattern.compile(regex);
         Matcher matcher = pattern.matcher(stdout);
@@ -83,4 +82,19 @@ public class OutputUtils {
             throw new Exception("Data for the output parameter '" + outParam + "' was not found");
         }
     }
+
+    public static String[] parseStdoutArray(String stdout, String outParam) throws Exception {
+        String regex = Pattern.quote(outParam) + regexPattern;
+        StringBuffer match = new StringBuffer();
+        Pattern pattern = Pattern.compile(regex);
+        Matcher matcher = pattern.matcher(stdout);
+        while (matcher.find()) {
+            match.append(matcher.group(1) + ",");
+        }
+        if (match != null) {
+            return match.toString().split(",");
+        } else {
+            throw new Exception("Data for the output parameter '" + outParam + "' was not found");
+        }
+    }
 }