You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2013/02/06 00:10:18 UTC
svn commit: r1442792 - in
/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac:
./ context/ provider/ utils/
Author: lahiru
Date: Tue Feb 5 23:10:18 2013
New Revision: 1442792
URL: http://svn.apache.org/viewvc?rev=1442792&view=rev
Log:
more changes to new gfac implementation.
Modified:
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Constants.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/GFacConfiguration.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/JobExecutionContext.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProviderException.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Constants.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Constants.java?rev=1442792&r1=1442791&r2=1442792&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Constants.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Constants.java Tue Feb 5 23:10:18 2013
@@ -22,14 +22,21 @@
package org.apache.airavata.gfac;
public class Constants {
- public static final String XPATH_EXPR_MYPROXY_SERVER = "/GFac/MyProxy/Server/text()";
- public static final String XPATH_EXPR_MYPROXY_USER = "/GFac/MyProxy/User/text()";
- public static final String XPATH_EXPR_MYPROXY_PASSPHRASE = "/GFac/MyProxy/Passphrase/text()";
- public static final String XPATH_EXPR_MYPROXY_LIFECYCLE = "/GFac/MyProxy/LifeCycle/text()";
- public static final String XPATH_EXPR_INFLOW_HANDLERS = "/GFac/Handlers/InFlow/Handler";
- public static final String XPATH_EXPR_OUTFLOW_HANDLERS = "/GFac/Handlers/OutFlow/Handler";
+ public static final String XPATH_EXPR_GLOBAL_INFLOW_HANDLERS = "/GFac/GlobalHandlers/InHandlers/Handler";
+ public static final String XPATH_EXPR_GLOBAL_OUTFLOW_HANDLERS = "/GFac/GlobalHandlers/OutHandlers/Handler";
+
+ public static final String XPATH_EXPR_APPLICATION_HANDLERS_START = "/GFac/Application[@name='";
+ public static final String XPATH_EXPR_APPLICATION_INFLOW_HANDLERS_END = "']/InHandlers/Handler";
+ public static final String XPATH_EXPR_APPLICATION_OUTFLOW_HANDLERS_END = "']/OutHandlers/Handler";
+
+
+ public static final String XPATH_EXPR_PROVIDER_HANDLERS_START = "/GFac/Provider[@class='";
+ public static final String XPATH_EXPR_PROVIDER_INFLOW_HANDLERS_END = "']/InHandlers/Handler";
+ public static final String XPATH_EXPR_PROVIDER_OUTFLOW_HANDLERS_END = "']/OutHandlers/Handler";
+
public static final String GFAC_CONFIG_HANDLER_CLASS_ATTRIBUTE = "class";
+ public static final String GFAC_CONFIG_APPLICATION_NAME_ATTRIBUTE = "class";
public static final String NEWLINE = System.getProperty("line.separator");
public static final String INPUT_DATA_DIR_VAR_NAME = "inputData";
public static final String OUTPUT_DATA_DIR_VAR_NAME = "outputData";
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/GFacConfiguration.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/GFacConfiguration.java?rev=1442792&r1=1442791&r2=1442792&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/GFacConfiguration.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/GFacConfiguration.java Tue Feb 5 23:10:18 2013
@@ -1,5 +1,7 @@
package org.apache.airavata.gfac;
+import com.amazonaws.services.glacier.internal.TreeHashInputStream;
+import com.sun.org.apache.bcel.internal.generic.NEW;
import org.apache.airavata.client.api.AiravataAPI;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.exception.UnspecifiedApplicationSettingsException;
@@ -18,9 +20,7 @@ import javax.xml.parsers.ParserConfigura
import javax.xml.xpath.*;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
+import java.util.*;
public class GFacConfiguration {
public static final Logger log = LoggerFactory.getLogger(GFacConfiguration.class);
@@ -42,6 +42,7 @@ public class GFacConfiguration {
private String trustedCertLocation;
+ private static Document handlerDoc;
// Keep list of full qualified class names of GFac handlers which should invoked before
// the provider
private List<String> inHandlers = new ArrayList<String>();
@@ -52,33 +53,34 @@ public class GFacConfiguration {
private static List<GridConfigurationHandler> gridConfigurationHandlers;
- private static String GRID_HANDLERS="airavata.grid.handlers";
+ private static String GRID_HANDLERS = "airavata.grid.handlers";
- static{
- gridConfigurationHandlers=new ArrayList<GridConfigurationHandler>();
- try {
- String handlerString = ServerSettings.getSetting(GRID_HANDLERS);
- String[] handlers = handlerString.split(",");
- for (String handlerClass : handlers) {
- try {
- @SuppressWarnings("unchecked")
- Class<GridConfigurationHandler> classInstance = (Class<GridConfigurationHandler>) GFacConfiguration.class
- .getClassLoader().loadClass(handlerClass);
- gridConfigurationHandlers.add(classInstance.newInstance());
- } catch (Exception e) {
- log.error("Error while loading Grid Configuration Handler class "+handlerClass, e);
- }
- }
- } catch (UnspecifiedApplicationSettingsException e) {
- //no handlers defined
- } catch (ApplicationSettingsException e1) {
- log.error("Error in reading Configuration handler data!!!",e1);
- }
+ static {
+ gridConfigurationHandlers = new ArrayList<GridConfigurationHandler>();
+ try {
+ String handlerString = ServerSettings.getSetting(GRID_HANDLERS);
+ String[] handlers = handlerString.split(",");
+ for (String handlerClass : handlers) {
+ try {
+ @SuppressWarnings("unchecked")
+ Class<GridConfigurationHandler> classInstance = (Class<GridConfigurationHandler>) GFacConfiguration.class
+ .getClassLoader().loadClass(handlerClass);
+ gridConfigurationHandlers.add(classInstance.newInstance());
+ } catch (Exception e) {
+ log.error("Error while loading Grid Configuration Handler class " + handlerClass, e);
+ }
+ }
+ } catch (UnspecifiedApplicationSettingsException e) {
+ //no handlers defined
+ } catch (ApplicationSettingsException e1) {
+ log.error("Error in reading Configuration handler data!!!", e1);
+ }
}
- public static GridConfigurationHandler[] getGridConfigurationHandlers(){
- return gridConfigurationHandlers.toArray(new GridConfigurationHandler[]{});
+ public static GridConfigurationHandler[] getGridConfigurationHandlers() {
+ return gridConfigurationHandlers.toArray(new GridConfigurationHandler[]{});
}
+
public GFacConfiguration(AiravataAPI airavataAPI, Properties configurationProperties) {
this.airavataAPI = airavataAPI;
if (configurationProperties != null) {
@@ -90,9 +92,10 @@ public class GFacConfiguration {
} else {
throw new NullPointerException("GFac Configuration properties cannot be null.");
}
+
}
- public GFacConfiguration(AiravataAPI airavataAPI){
+ public GFacConfiguration(AiravataAPI airavataAPI) {
this.airavataAPI = airavataAPI;
}
@@ -121,11 +124,13 @@ public class GFacConfiguration {
}
public List<String> getInHandlers() {
- return inHandlers;
+ //This will avoid the misconfiguration done by user in gfac-config.xml
+ return removeDuplicateWithOrder(inHandlers);
}
public List<String> getOutHandlers() {
- return outHandlers;
+ //This will avoid the misconfiguration done by user in gfac-config.xml
+ return removeDuplicateWithOrder(outHandlers);
}
public void setMyProxyServer(String myProxyServer) {
@@ -156,25 +161,70 @@ public class GFacConfiguration {
this.outHandlers = outHandlers;
}
+ public void setInHandlers(String providerName, String applicationName) {
+ try {
+ this.inHandlers = xpathGetAttributeValueList(handlerDoc, Constants.XPATH_EXPR_GLOBAL_INFLOW_HANDLERS, Constants.GFAC_CONFIG_HANDLER_CLASS_ATTRIBUTE);
+ if (applicationName != null) {
+ String xPath = Constants.XPATH_EXPR_APPLICATION_HANDLERS_START + applicationName + Constants.XPATH_EXPR_APPLICATION_INFLOW_HANDLERS_END;
+ List<String> strings = xpathGetAttributeValueList(handlerDoc, xPath, Constants.GFAC_CONFIG_HANDLER_CLASS_ATTRIBUTE);
+ this.inHandlers.addAll(strings);
+ }
+ if (providerName != null) {
+ String xPath = Constants.XPATH_EXPR_PROVIDER_HANDLERS_START + providerName + Constants.XPATH_EXPR_PROVIDER_INFLOW_HANDLERS_END;
+ List<String> strings = xpathGetAttributeValueList(handlerDoc, xPath, Constants.GFAC_CONFIG_APPLICATION_NAME_ATTRIBUTE);
+ this.inHandlers.addAll(strings);
+ }
+ } catch (XPathExpressionException e) {
+ new GFacException("Error parsing Handler Configuration", e);
+ }
+ }
+
+ public void setOutHandlers(String providerName, String applicationName) {
+ try {
+ this.outHandlers = xpathGetAttributeValueList(handlerDoc, Constants.XPATH_EXPR_GLOBAL_OUTFLOW_HANDLERS, Constants.GFAC_CONFIG_HANDLER_CLASS_ATTRIBUTE);
+ if (applicationName != null) {
+ String xPath = Constants.XPATH_EXPR_APPLICATION_HANDLERS_START + applicationName + Constants.XPATH_EXPR_APPLICATION_OUTFLOW_HANDLERS_END;
+ List<String> strings = xpathGetAttributeValueList(handlerDoc, xPath, Constants.GFAC_CONFIG_HANDLER_CLASS_ATTRIBUTE);
+ this.outHandlers.addAll(strings);
+ } else if (providerName != null) {
+ String xPath = Constants.XPATH_EXPR_PROVIDER_HANDLERS_START + providerName + Constants.XPATH_EXPR_PROVIDER_OUTFLOW_HANDLERS_END;
+ List<String> strings = xpathGetAttributeValueList(handlerDoc, xPath, Constants.GFAC_CONFIG_HANDLER_CLASS_ATTRIBUTE);
+ this.outHandlers.addAll(strings);
+ }
+ } catch (XPathExpressionException e) {
+ new GFacException("Error parsing Handler Configuration", e);
+ }
+ }
+
/**
* Parse GFac configuration file and populate GFacConfiguration object. XML configuration
* file for GFac will look like below.
- *
+ * <p/>
* <GFac>
- * <MyProxy>
- * <Server></Server>
- * <User></User>
- * <Passphrase></Passphrase>
- * <LifeCycle></LifeCycle>
- * </MyProxy>
- * <Handlers>
- * <InFlow>
- * <Handler class="org.apache.airavata.gfac.handler.impl.HadoopDeploymentHandler"/>
- * </InFlow>
- * <OutFlow>
- * <Handler class="org.apache.airavata.gfac.handler.impl.HadoopDeploymentHandler"/>
- * </OutFlow>
- * </Handlers>
+ * <GlobalHandlers>
+ * <InHandlers>
+ * <Handler class="org.apache.airavata.gfac.GlobalHandler1">
+ * </InHandler>
+ * <OutHandlers>
+ * <Handler class="org.apache.airavata.gfac.GlabalHandler2">
+ * </OutHandlers>
+ * </GlobalHandlers>
+ * <Provider class="org.apache.airavata.gfac.providers.LocalProvider" host="LocalHost">
+ * <InHandlers>
+ * <Handler class="org.apache.airavata.gfac.handlers.LocalEvenSetupHandler">
+ * </InHandlers>
+ * <OutHandlers>
+ * <Handler>org.apache.airavata.LocalOutHandler1</Handler>
+ * </OutHandlers>
+ * </Provider>
+ * <Application name="UltraScan">
+ * <InHandlers>
+ * <Handler class="org.apache.airavata.gfac.handlers.LocalEvenSetupHandler">
+ * </InHandlers>
+ * <OutHandlers>
+ * <Handler class="org.apache.airavata.gfac.LocalOutHandler1">
+ * </OutHandlers>
+ * </Application>
* </GFac>
*
* @param configFile configuration file
@@ -183,24 +233,8 @@ public class GFacConfiguration {
public static GFacConfiguration create(File configFile, AiravataAPI airavataAPI) throws ParserConfigurationException, IOException, SAXException, XPathExpressionException {
DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder docBuilder = docBuilderFactory.newDocumentBuilder();
- Document doc = docBuilder.parse(configFile);
-
+ handlerDoc = docBuilder.parse(configFile);
GFacConfiguration configuration = new GFacConfiguration(airavataAPI);
-
- configuration.setMyProxyServer(xpathGetText(doc, Constants.XPATH_EXPR_MYPROXY_SERVER));
- configuration.setMyProxyUser(xpathGetText(doc, Constants.XPATH_EXPR_MYPROXY_USER));
- configuration.setMyProxyPassphrase(xpathGetText(doc,
- Constants.XPATH_EXPR_MYPROXY_PASSPHRASE));
- configuration.setMyProxyLifeCycle(Integer.parseInt(
- xpathGetText(doc, Constants.XPATH_EXPR_MYPROXY_LIFECYCLE)));
-
- configuration.setInHandlers(xpathGetAttributeValueList(doc,
- Constants.XPATH_EXPR_INFLOW_HANDLERS,
- Constants.GFAC_CONFIG_HANDLER_CLASS_ATTRIBUTE));
- configuration.setOutHandlers(xpathGetAttributeValueList(doc,
- Constants.XPATH_EXPR_OUTFLOW_HANDLERS,
- Constants.GFAC_CONFIG_HANDLER_CLASS_ATTRIBUTE));
-
return configuration;
}
@@ -209,35 +243,50 @@ public class GFacConfiguration {
XPath xPath = xPathFactory.newXPath();
XPathExpression expr = xPath.compile(expression);
- return (String)expr.evaluate(doc, XPathConstants.STRING);
+ return (String) expr.evaluate(doc, XPathConstants.STRING);
}
/**
* Select matching node set and extract specified attribute value.
- * @param doc XML document
- * @param expression expression to match node set
- * @param attribute name of the attribute to extract
+ *
+ * @param doc XML document
+ * @param expression expression to match node set
+ * @param attribute name of the attribute to extract
* @return list of attribute values.
* @throws XPathExpressionException
*/
- private static List<String> xpathGetAttributeValueList(Document doc, String expression, String attribute) throws XPathExpressionException{
+ private static List<String> xpathGetAttributeValueList(Document doc, String expression, String attribute) throws XPathExpressionException {
+ System.out.println(expression);
XPathFactory xPathFactory = XPathFactory.newInstance();
XPath xPath = xPathFactory.newXPath();
XPathExpression expr = xPath.compile(expression);
- NodeList nl = (NodeList)expr.evaluate(doc, XPathConstants.NODESET);
+ NodeList nl = (NodeList) expr.evaluate(doc, XPathConstants.NODESET);
List<String> attributeValues = new ArrayList<String>();
- for(int i = 0; i < nl.getLength(); i++){
- attributeValues.add(((Element)nl.item(i)).getAttribute(attribute));
+ for (int i = 0; i < nl.getLength(); i++) {
+ attributeValues.add(((Element) nl.item(i)).getAttribute(attribute));
}
return attributeValues;
}
- public static GFacConfiguration create(Properties configProps){
+ public static GFacConfiguration create(Properties configProps) {
return null;
}
+ private static List removeDuplicateWithOrder(List arlList) {
+ Set set = new HashSet();
+ List newList = new ArrayList();
+ for (Iterator iter = arlList.iterator(); iter.hasNext(); ) {
+ Object element = iter.next();
+ if (set.add(element))
+ newList.add(element);
+ }
+ arlList.clear();
+ arlList.addAll(newList);
+ return arlList;
+ }
+
}
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java?rev=1442792&r1=1442791&r2=1442792&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java Tue Feb 5 23:10:18 2013
@@ -46,9 +46,6 @@ public class Scheduler {
*/
public static void schedule(JobExecutionContext jobExecutionContext) {
// Current implementation only support static handler sequence.
- GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
- jobExecutionContext.setInHandlers(gFacConfiguration.getInHandlers());
- jobExecutionContext.setOutHandlers(gFacConfiguration.getOutHandlers());
jobExecutionContext.setProvider(getProvider(jobExecutionContext));
// TODO: Selecting the provider based on application description.
}
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/JobExecutionContext.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/JobExecutionContext.java?rev=1442792&r1=1442791&r2=1442792&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/JobExecutionContext.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/JobExecutionContext.java Tue Feb 5 23:10:18 2013
@@ -105,6 +105,10 @@ public class JobExecutionContext extends
}
public void setProvider(GFacProvider provider) {
+ this.gfacConfiguration.setInHandlers(provider.getClass().getName(),
+ this.getApplicationContext().getApplicationDeploymentDescription().getType().getApplicationName().getStringValue());
+ this.gfacConfiguration.setOutHandlers(provider.getClass().getName(),
+ this.getApplicationContext().getApplicationDeploymentDescription().getType().getApplicationName().getStringValue());
this.provider = provider;
}
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProviderException.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProviderException.java?rev=1442792&r1=1442791&r2=1442792&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProviderException.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProviderException.java Tue Feb 5 23:10:18 2013
@@ -25,6 +25,7 @@ import org.apache.airavata.gfac.context.
import org.apache.airavata.gfac.notification.events.ExecutionFailEvent;
public class GFacProviderException extends Exception {
+ private String aditionalInfo[] = null;
public GFacProviderException(String message) {
super(message);
@@ -46,6 +47,7 @@ public class GFacProviderException exten
public GFacProviderException(String message, JobExecutionContext context,Exception e,String... additionExceptiondata) {
super(message);
+ this.aditionalInfo = additionExceptiondata;
sendFaultNotification(message,context,e, additionExceptiondata);
}
@@ -57,4 +59,8 @@ public class GFacProviderException exten
}
executionContext.getNotifier().publish(new ExecutionFailEvent(e));
}
+
+ public String[] getAditionalInfo() {
+ return aditionalInfo;
+ }
}
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java?rev=1442792&r1=1442791&r2=1442792&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java Tue Feb 5 23:10:18 2013
@@ -43,10 +43,6 @@ public class GramProvider implements GFa
// This method precpare the environment before the application invocation.
public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException {
- GramProviderUtils.makeDirectory(jobExecutionContext);
- //Note this step has to be done before setupEnvironment,otherwise input file path adjusting based on the
- //application hosted machien will not reflect in the RSL
- GramProviderUtils.processInput(jobExecutionContext);
job = GramProviderUtils.setupEnvironment(jobExecutionContext);
listener = new GramJobSubmissionListener(job, jobExecutionContext);
job.addListener(listener);
@@ -134,6 +130,5 @@ public class GramProvider implements GFa
}
public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
- GramProviderUtils.processOutput(jobExecutionContext);
}
}
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java?rev=1442792&r1=1442791&r2=1442792&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java Tue Feb 5 23:10:18 2013
@@ -30,7 +30,6 @@ import org.apache.airavata.gfac.external
import org.apache.airavata.gfac.notification.events.ExecutionFailEvent;
import org.apache.airavata.gfac.provider.GFacProviderException;
import org.apache.airavata.schemas.gfac.*;
-import org.apache.xmlbeans.XmlException;
import org.globus.gram.GramAttributes;
import org.globus.gram.GramJob;
import org.ietf.jgss.GSSCredential;
@@ -109,249 +108,6 @@ public class GramProviderUtils {
}
}
- public static Map<String, ?> processOutput(JobExecutionContext jobExecutionContext) throws GFacProviderException {
- GlobusHostType host = (GlobusHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType();
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
- GridFtp ftp = new GridFtp();
- File localStdErrFile = null;
- Map<String, ActualParameter> stringMap = null;
- try {
- GSISecurityContext gssContext = new GSISecurityContext(jobExecutionContext.getGFacConfiguration());
- GSSCredential gssCred = gssContext.getGssCredentails();
-
- String[] hostgridFTP = host.getGridFTPEndPointArray();
- if (hostgridFTP == null || hostgridFTP.length == 0) {
- hostgridFTP = new String[]{host.getHostAddress()};
- }
- GFacProviderException pe = null;
- for (String endpoint : host.getGridFTPEndPointArray()) {
- try {
- /*
- * Read Stdout and Stderror
- */
- URI stdoutURI = GFacUtils.createGsiftpURI(endpoint, app.getStandardOutput());
- URI stderrURI = GFacUtils.createGsiftpURI(endpoint, app.getStandardError());
-
- log.info("STDOUT:" + stdoutURI.toString());
- log.info("STDERR:" + stderrURI.toString());
-
- File logDir = new File("./service_logs");
- if (!logDir.exists()) {
- logDir.mkdir();
- }
-
- String timeStampedServiceName = GFacUtils.createUniqueNameForService(jobExecutionContext
- .getServiceName());
- File localStdOutFile = File.createTempFile(timeStampedServiceName, "stdout");
- localStdErrFile = File.createTempFile(timeStampedServiceName, "stderr");
-
- String stdout = ftp.readRemoteFile(stdoutURI, gssCred, localStdOutFile);
- String stderr = ftp.readRemoteFile(stderrURI, gssCred, localStdErrFile);
- Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
- Set<String> keys = output.keySet();
- for (String paramName : keys) {
- ActualParameter actualParameter = (ActualParameter) output.get(paramName);
- if ("URIArray".equals(actualParameter.getType().getType().toString())) {
- URI outputURI = GFacUtils.createGsiftpURI(endpoint, app.getOutputDataDirectory());
- List<String> outputList = ftp.listDir(outputURI, gssCred);
- String[] valueList = outputList.toArray(new String[outputList.size()]);
- ((URIArrayType) actualParameter.getType()).setValueArray(valueList);
- stringMap = new HashMap<String, ActualParameter>();
- stringMap.put(paramName, actualParameter);
- }
- if ("StringArray".equals(actualParameter.getType().getType().toString())) {
- String[] valueList = OutputUtils.parseStdoutArray(stdout, paramName);
- ((StringArrayType) actualParameter.getType()).setValueArray(valueList);
- stringMap = new HashMap<String, ActualParameter>();
- stringMap.put(paramName, actualParameter);
- } else {
- // This is to handle exception during the output parsing.
- stringMap = OutputUtils.fillOutputFromStdout(jobExecutionContext, stdout, stderr);
- }
- }
- if (stringMap == null || stringMap.isEmpty()) {
- jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(new Throwable("Empty Output returned from the Application, Double check the application" +
- "and ApplicationDescriptor output Parameter Names")));
-// GFacProviderException exception = new GFacProviderException("Gram provider: Error creating job output", jobExecutionContext);
-// jobExecutionContext.getExecutionContext().getNotifier().executionFail(jobExecutionContext,exception,exception.getLocalizedMessage());
-// throw exception;
- }
- //todo check the workflow context header and run the stateOutputFiles method to stage the output files in to a user defined location
- } catch (ToolsException e) {
- throw new GFacProviderException(e.getMessage(), jobExecutionContext, e, readLastLinesofStdOut(localStdErrFile.getPath(), 20));
- } catch (URISyntaxException e) {
- throw new GFacProviderException("URI is malformatted:" + e.getMessage(), jobExecutionContext, e, readLastLinesofStdOut(localStdErrFile.getPath(), 20));
- } catch (NullPointerException e) {
- throw new GFacProviderException("Output is not produced in stdout:" + e.getMessage(), jobExecutionContext, e, readLastLinesofStdOut(localStdErrFile.getPath(), 20));
- }
- }
- //todo this return has to be removed
- return stringMap;
- } catch (Exception e) {
-// jobExecutionContext.getExecutionContext().getNotifier().executionFail(jobExecutionContext,e,readLastLinesofStdOut(localStdErrFile.getPath(), 20));
- throw new GFacProviderException(e.getMessage(), jobExecutionContext, e, readLastLinesofStdOut(localStdErrFile.getPath(), 20));
- }
- }
- private static String readLastLinesofStdOut(String path, int count) {
- StringBuffer buffer = new StringBuffer();
- FileInputStream in = null;
- try {
- in = new FileInputStream(path);
- } catch (FileNotFoundException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- BufferedReader br = new BufferedReader(new InputStreamReader(in));
- List<String> strLine = new ArrayList<String>();
- String tmp = null;
- int numberofLines = 0;
- try {
- while ((tmp = br.readLine()) != null) {
- strLine.add(tmp);
- numberofLines++;
- }
- } catch (IOException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- if (numberofLines > count) {
- for (int i = numberofLines - count; i < numberofLines; i++) {
- buffer.append(strLine.get(i));
- buffer.append("\n");
- }
- } else {
- for (int i = 0; i < numberofLines; i++) {
- buffer.append(strLine.get(i));
- buffer.append("\n");
- }
- }
- try {
- in.close();
- } catch (IOException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- return buffer.toString();
- }
-
- private static void stageOutputFiles(JobExecutionContext jobExecutionContext, String outputFileStagingPath) throws GFacProviderException {
- MessageContext outputNew = new MessageContext();
- MessageContext output = jobExecutionContext.getOutMessageContext();
- Map<String, Object> parameters = output.getParameters();
- for (String paramName : parameters.keySet()) {
- ActualParameter actualParameter = (ActualParameter) parameters
- .get(paramName);
- //TODO: Review this with type
- GlobusHostType host = (GlobusHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType();
- GridFtp ftp = new GridFtp();
-
- GSISecurityContext gssContext = new GSISecurityContext(jobExecutionContext.getGFacConfiguration());
- GSSCredential gssCred = null;
- try {
- gssCred = gssContext.getGssCredentails();
- } catch (SecurityException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- try {
- if ("URI".equals(actualParameter.getType().getType().toString())) {
- for (String endpoint : host.getGridFTPEndPointArray()) {
- ((URIParameterType) actualParameter.getType()).setValue(doStaging(outputFileStagingPath,
- MappingFactory.toString(actualParameter), ftp, gssCred, endpoint));
- }
- } else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
- List<String> split = Arrays.asList(MappingFactory.toString(actualParameter).split(","));
- List<String> newFiles = new ArrayList<String>();
- for (String endpoint : host.getGridFTPEndPointArray()) {
- for (String paramValueEach : split) {
- newFiles.add(doStaging(outputFileStagingPath, paramValueEach, ftp, gssCred, endpoint));
- }
- ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
- }
-
- }
- } catch (URISyntaxException e) {
- throw new GFacProviderException(e.getMessage(), e, jobExecutionContext);
- } catch (ToolsException e) {
- throw new GFacProviderException(e.getMessage(), e, jobExecutionContext);
- }
- outputNew.getParameters().put(paramName, actualParameter);
- }
- jobExecutionContext.setOutMessageContext(outputNew);
- }
-
- private static String doStaging(String outputFileStagingPath, String paramValue, GridFtp ftp, GSSCredential gssCred, String endpoint) throws URISyntaxException, ToolsException {
- URI srcURI = GFacUtils.createGsiftpURI(endpoint, paramValue);
- String fileName = new File(srcURI.getPath()).getName();
- File outputFile = new File(outputFileStagingPath + File.separator + fileName);
- ftp.readRemoteFile(srcURI,
- gssCred, outputFile);
- return outputFileStagingPath + File.separator + fileName;
- }
-
- private static String stageInputFiles(JobExecutionContext jobExecutionContext, String paramValue) throws URISyntaxException, SecurityException, ToolsException, IOException {
- URI gridftpURL;
- gridftpURL = new URI(paramValue);
- GlobusHostType host = (GlobusHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType();
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
- GridFtp ftp = new GridFtp();
- URI destURI = null;
- GSISecurityContext gssContext = new GSISecurityContext(jobExecutionContext.getGFacConfiguration());
- GSSCredential gssCred = gssContext.getGssCredentails();
-
- for (String endpoint : host.getGridFTPEndPointArray()) {
- URI inputURI = GFacUtils.createGsiftpURI(endpoint, app.getInputDataDirectory());
- String fileName = new File(gridftpURL.getPath()).getName();
- String s = inputURI.getPath() + File.separator + fileName;
- //if user give a url just to refer an endpoint, not a web resource we are not doing any transfer
- if (fileName != null && !"".equals(fileName)) {
- destURI = GFacUtils.createGsiftpURI(endpoint, s);
- if (paramValue.startsWith("gsiftp")) {
- ftp.uploadFile(gridftpURL, destURI, gssCred);
- } else if (paramValue.startsWith("file")) {
- String localFile = paramValue.substring(paramValue.indexOf(":") + 1, paramValue.length());
- ftp.uploadFile(destURI, gssCred, new FileInputStream(localFile));
- } else if (paramValue.startsWith("http")) {
- ftp.uploadFile(destURI,
- gssCred, (gridftpURL.toURL().openStream()));
- } else {
- //todo throw exception telling unsupported protocol
- return paramValue;
- }
- } else {
- // When the given input is not a web resource but a URI type input, then we don't do any transfer just keep the same value as it isin the input
- return paramValue;
- }
- }
- System.out.println(destURI.getPath());
- return destURI.getPath();
- }
-
- public static Map<String, ?> processInput(JobExecutionContext jobExecutionContext)
- throws GFacProviderException {
- MessageContext inputNew = new MessageContext();
- try {
- MessageContext input = jobExecutionContext.getInMessageContext();
- Set<String> parameters = input.getParameters().keySet();
- for (String paramName:parameters) {
- ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName);
- String paramValue = MappingFactory.toString(actualParameter);
- //TODO: Review this with type
- if ("URI".equals(actualParameter.getType().getType().toString())) {
- ((URIParameterType) actualParameter.getType()).setValue(stageInputFiles(jobExecutionContext, paramValue));
- } else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
- List<String> split = Arrays.asList(paramValue.split(","));
- List<String> newFiles = new ArrayList<String>();
- for (String paramValueEach : split) {
- newFiles.add(stageInputFiles(jobExecutionContext, paramValueEach));
- }
- ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
- }
- inputNew.getParameters().put(paramName, actualParameter);
- }
- } catch (Exception e) {
-// jobExecutionContext.getExecutionContext().getNotifier().executionFail(jobExecutionContext,e,"Error during Input File staging");
- throw new GFacProviderException("Error while input File Staging", jobExecutionContext, e, e.getLocalizedMessage());
- }
- jobExecutionContext.setInMessageContext(inputNew);
- return null;
- }
}