You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ra...@apache.org on 2012/05/31 23:28:45 UTC
svn commit: r1344920 -
/incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/impl/GramProvider.java
Author: raminder
Date: Thu May 31 21:28:44 2012
New Revision: 1344920
URL: http://svn.apache.org/viewvc?rev=1344920&view=rev
Log:
Fixed AIRAVATA-436 to handle URIArray output types and AIRAVATA-442 for gram errors
Modified:
incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/impl/GramProvider.java
Modified: incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/impl/GramProvider.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/impl/GramProvider.java?rev=1344920&r1=1344919&r2=1344920&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/impl/GramProvider.java (original)
+++ incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/impl/GramProvider.java Thu May 31 21:28:44 2012
@@ -28,6 +28,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -53,6 +54,7 @@ import org.apache.airavata.schemas.gfac.
import org.apache.airavata.schemas.gfac.URIArrayType;
import org.apache.airavata.schemas.gfac.URIParameterType;
import org.apache.airavata.schemas.wec.WorkflowOutputDataHandlingDocument;
+import org.apache.xmlbeans.XmlException;
import org.globus.gram.GramAttributes;
import org.globus.gram.GramException;
import org.globus.gram.GramJob;
@@ -173,8 +175,8 @@ public class GramProvider extends Abstra
buf.append("Finished launching job, Host = ").append(host.getHostAddress()).append(" RSL = ")
.append(job.getRSL()).append(" working directory = ").append(app.getStaticWorkingDirectory())
- .append(" tempDirectory = ").append(app.getScratchWorkingDirectory())
- .append(" Globus GateKeeper cantact = ").append(gateKeeper);
+ .append(" temp directory = ").append(app.getScratchWorkingDirectory())
+ .append(" Globus GateKeeper Endpoint = ").append(gateKeeper);
invocationContext.getExecutionContext().getNotifier().info(invocationContext, buf.toString());
/*
@@ -209,43 +211,20 @@ public class GramProvider extends Abstra
*/
int jobStatus = listener.getStatus();
- if(job.getExitCode() != 0){
+ if(job.getExitCode() != 0 || jobStatus == GramJob.STATUS_FAILED){
int errCode = listener.getError();
String errorMsg = "Job " + job.getID() + " on host " + host.getHostAddress() + " Job Exit Code = "
- + job.getExitCode();
+ + listener.getError();
JobSubmissionFault error = new JobSubmissionFault(this, new Exception(errorMsg), "GFAC HOST",
gateKeeper, job.getRSL());
- if (errCode == 8) {
- error.setReason(JobSubmissionFault.JOB_CANCEL);
- } else {
- error.setReason(JobSubmissionFault.JOB_FAILED + " With Exit Code:" + job.getExitCode());
- }
- invocationContext.getExecutionContext().getNotifier().executionFail(invocationContext,error,errorMsg);
- throw error;
- }
- if (jobStatus == GramJob.STATUS_FAILED) {
- int errCode = listener.getError();
- String errorMsg = "Job " + job.getID() + " on host " + host.getHostAddress() + " Error Code = "
- + errCode;
- JobSubmissionFault error = new JobSubmissionFault(this, new Exception(errorMsg), "GFAC HOST",
- gateKeeper, job.getRSL());
- if (errCode == 8) {
- error.setReason(JobSubmissionFault.JOB_CANCEL);
- } else {
- error.setReason(JobSubmissionFault.JOB_FAILED);
- }
+ errorReason(errCode, error);
invocationContext.getExecutionContext().getNotifier().executionFail(invocationContext,error,errorMsg);
throw error;
}
-
- } catch (GramException e) {
+ } catch (GramException e) {
JobSubmissionFault error = new JobSubmissionFault(this, e, host.getHostAddress(), gateKeeper, job.getRSL());
- if (listener.getError() == 8) {
- error.setReason(JobSubmissionFault.JOB_CANCEL);
- } else {
- error.setReason(JobSubmissionFault.JOB_FAILED);
- }
- throw error;
+ int errCode = listener.getError();
+ throw errorReason(errCode, error);
} catch (GSSException e) {
throw new ProviderException(e.getMessage(), e);
} catch (InterruptedException e) {
@@ -263,6 +242,15 @@ public class GramProvider extends Abstra
}
+ private JobSubmissionFault errorReason(int errCode, JobSubmissionFault error) {
+ if (errCode == 8) {
+ error.setReason(JobSubmissionFault.JOB_CANCEL);
+ } else {
+ error.setReason(JobSubmissionFault.JOB_FAILED + " With Exit Code:" + job.getExitCode());
+ }
+ return error;
+ }
+
public Map<String, ?> processOutput(InvocationContext invocationContext) throws ProviderException {
GlobusHostType host = (GlobusHostType) invocationContext.getExecutionDescription().getHost().getType();
ApplicationDeploymentDescriptionType app = invocationContext.getExecutionDescription().getApp().getType();
@@ -274,18 +262,13 @@ public class GramProvider extends Abstra
if (hostgridFTP == null || hostgridFTP.length == 0) {
hostgridFTP = new String[] { host.getHostAddress() };
}
-
boolean success = false;
ProviderException pe = new ProviderException("");
-
for (String endpoint : host.getGridFTPEndPointArray()) {
-
try {
-
/*
- * Stdout and Stderror
+ * Read Stdout and Stderror
*/
-
URI stdoutURI = GfacUtils.createGsiftpURI(endpoint, app.getStandardOutput());
URI stderrURI = GfacUtils.createGsiftpURI(endpoint, app.getStandardError());
@@ -297,7 +280,6 @@ public class GramProvider extends Abstra
logDir.mkdir();
}
- // Get the Stdouts and StdErrs
String timeStampedServiceName = GfacUtils.createUniqueNameForService(invocationContext
.getServiceName());
File localStdOutFile = File.createTempFile(timeStampedServiceName, "stdout");
@@ -305,40 +287,38 @@ public class GramProvider extends Abstra
String stdout = ftp.readRemoteFile(stdoutURI, gssCred, localStdOutFile);
String stderr = ftp.readRemoteFile(stderrURI, gssCred, localStdErrFile);
- Map<String,?> stringMap = null;
- // This is to handle exception during the output parsing.
- try{
- stringMap = OutputUtils.fillOutputFromStdout(invocationContext.<ActualParameter>getOutput(), stdout);
- }catch(Exception e){
- int errCode = listener.getError();
- String errorMsg = "Job " + job.getID() + " on host " + host.getHostAddress();
- JobSubmissionFault error = new JobSubmissionFault(this, new Exception(errorMsg), "GFAC HOST",
- gateKeeper, job.getRSL());
- if (errCode == 8) {
- error.setReason(JobSubmissionFault.JOB_CANCEL);
- } else {
- error.setReason(JobSubmissionFault.JOB_FAILED + " With Null Output Value :");
- }
- invocationContext.getExecutionContext().getNotifier().executionFail(invocationContext,error,errorMsg);
- throw error;
- }
- MessageContext<Object> input = invocationContext.getOutput();
- for (Iterator<String> iterator = input.getNames(); iterator.hasNext(); ) {
+ Map<String,ActualParameter> stringMap = null;
+ MessageContext<Object> output = invocationContext.getOutput();
+ for (Iterator<String> iterator = output.getNames(); iterator.hasNext(); ) {
String paramName = iterator.next();
- String paramValue = input.getStringValue(paramName);
- if("".equals(paramValue) || paramValue == null){
+ ActualParameter actualParameter = (ActualParameter) output.getValue(paramName);
+ if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+ URI outputURI = GfacUtils.createGsiftpURI(endpoint,app.getOutputDataDirectory());
+ List<String> outputList = ftp.listDir(outputURI,gssCred);
+ String[] valueList = outputList.toArray(new String[outputList.size()]);
+ ((URIArrayType) actualParameter.getType()).setValueArray(valueList);
+ stringMap = new HashMap<String, ActualParameter>();
+ stringMap.put(paramName, actualParameter);
+ }
+ else{
+ // This is to handle exception during the output parsing.
+ stringMap = OutputUtils.fillOutputFromStdout(invocationContext.<ActualParameter>getOutput(), stdout);
+ String paramValue = output.getStringValue(paramName);
+ if(paramValue == null || paramValue.isEmpty()){
int errCode = listener.getError();
String errorMsg = "Job " + job.getID() + " on host " + host.getHostAddress();
JobSubmissionFault error = new JobSubmissionFault(this, new Exception(errorMsg), "GFAC HOST",
gateKeeper, job.getRSL());
- if (errCode == 8) {
- error.setReason(JobSubmissionFault.JOB_CANCEL);
- } else {
- error.setReason(JobSubmissionFault.JOB_FAILED + " With Exit Code:" + job.getExitCode());
- }
+ errorReason(errCode, error);
invocationContext.getExecutionContext().getNotifier().executionFail(invocationContext,error,errorMsg);
throw error;
}
+ }
+ }
+ if(stringMap == null || stringMap.isEmpty()){
+ ProviderException exception = new ProviderException("Error creating job output");
+ invocationContext.getExecutionContext().getNotifier().executionFail(invocationContext,exception,exception.getLocalizedMessage());
+ throw exception;
}
// If users has given an output DAta poth we download the output files in to that directory, this will be apath in the machine where GFac is installed
if(WorkflowContextHeaderBuilder.getCurrentContextHeader()
@@ -353,7 +333,11 @@ public class GramProvider extends Abstra
}
}
return stringMap;
- } catch (ToolsException e) {
+ }catch (XmlException e) {
+ invocationContext.getExecutionContext().getNotifier().executionFail(invocationContext,e,e.getMessage());
+ throw new ProviderException(e.getMessage(), e);
+ }
+ catch (ToolsException e) {
invocationContext.getExecutionContext().getNotifier().executionFail(invocationContext,e,e.getMessage());
throw new ProviderException(e.getMessage(), e);
} catch (URISyntaxException e) {