You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ra...@apache.org on 2012/05/31 23:28:45 UTC

svn commit: r1344920 - /incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/impl/GramProvider.java

Author: raminder
Date: Thu May 31 21:28:44 2012
New Revision: 1344920

URL: http://svn.apache.org/viewvc?rev=1344920&view=rev
Log:
Fixed AIRAVATA-436 to handle URIArray output types and AIRAVATA-442 for gram errors

Modified:
    incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/impl/GramProvider.java

Modified: incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/impl/GramProvider.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/impl/GramProvider.java?rev=1344920&r1=1344919&r2=1344920&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/impl/GramProvider.java (original)
+++ incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/impl/GramProvider.java Thu May 31 21:28:44 2012
@@ -28,6 +28,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -53,6 +54,7 @@ import org.apache.airavata.schemas.gfac.
 import org.apache.airavata.schemas.gfac.URIArrayType;
 import org.apache.airavata.schemas.gfac.URIParameterType;
 import org.apache.airavata.schemas.wec.WorkflowOutputDataHandlingDocument;
+import org.apache.xmlbeans.XmlException;
 import org.globus.gram.GramAttributes;
 import org.globus.gram.GramException;
 import org.globus.gram.GramJob;
@@ -173,8 +175,8 @@ public class GramProvider extends Abstra
 
             buf.append("Finished launching job, Host = ").append(host.getHostAddress()).append(" RSL = ")
                     .append(job.getRSL()).append(" working directory = ").append(app.getStaticWorkingDirectory())
-                    .append(" tempDirectory = ").append(app.getScratchWorkingDirectory())
-                    .append(" Globus GateKeeper cantact = ").append(gateKeeper);
+                    .append(" temp directory = ").append(app.getScratchWorkingDirectory())
+                    .append(" Globus GateKeeper Endpoint = ").append(gateKeeper);
             invocationContext.getExecutionContext().getNotifier().info(invocationContext, buf.toString());
 
             /*
@@ -209,43 +211,20 @@ public class GramProvider extends Abstra
              */
             int jobStatus = listener.getStatus();
 
-            if(job.getExitCode() != 0){
+            if(job.getExitCode() != 0 || jobStatus == GramJob.STATUS_FAILED){
                 int errCode = listener.getError();
                 String errorMsg = "Job " + job.getID() + " on host " + host.getHostAddress() + " Job Exit Code = "
-                        + job.getExitCode();
+                        + listener.getError();
                 JobSubmissionFault error = new JobSubmissionFault(this, new Exception(errorMsg), "GFAC HOST",
                         gateKeeper, job.getRSL());
-                if (errCode == 8) {
-                    error.setReason(JobSubmissionFault.JOB_CANCEL);
-                } else {
-                    error.setReason(JobSubmissionFault.JOB_FAILED + " With Exit Code:" + job.getExitCode());
-                }
-                invocationContext.getExecutionContext().getNotifier().executionFail(invocationContext,error,errorMsg);
-                throw error;
-            }
-            if (jobStatus == GramJob.STATUS_FAILED) {
-                int errCode = listener.getError();
-                String errorMsg = "Job " + job.getID() + " on host " + host.getHostAddress() + " Error Code = "
-                        + errCode;
-                JobSubmissionFault error = new JobSubmissionFault(this, new Exception(errorMsg), "GFAC HOST",
-                        gateKeeper, job.getRSL());
-                if (errCode == 8) {
-                    error.setReason(JobSubmissionFault.JOB_CANCEL);
-                } else {
-                    error.setReason(JobSubmissionFault.JOB_FAILED);
-                }
+                errorReason(errCode, error);
                 invocationContext.getExecutionContext().getNotifier().executionFail(invocationContext,error,errorMsg);
                 throw error;
             }
-
-        } catch (GramException e) {
+         } catch (GramException e) {
             JobSubmissionFault error = new JobSubmissionFault(this, e, host.getHostAddress(), gateKeeper, job.getRSL());
-            if (listener.getError() == 8) {
-                error.setReason(JobSubmissionFault.JOB_CANCEL);
-            } else {
-                error.setReason(JobSubmissionFault.JOB_FAILED);
-            }
-            throw error;
+            int errCode = listener.getError();
+		    throw errorReason(errCode, error);
         } catch (GSSException e) {
             throw new ProviderException(e.getMessage(), e);
         } catch (InterruptedException e) {
@@ -263,6 +242,15 @@ public class GramProvider extends Abstra
 
     }
 
+	private JobSubmissionFault errorReason(int errCode, JobSubmissionFault error) {
+		if (errCode == 8) {
+		    error.setReason(JobSubmissionFault.JOB_CANCEL);
+		} else {
+		    error.setReason(JobSubmissionFault.JOB_FAILED + " With Exit Code:" + job.getExitCode());
+		}
+		return error;
+	}
+
     public Map<String, ?> processOutput(InvocationContext invocationContext) throws ProviderException {
         GlobusHostType host = (GlobusHostType) invocationContext.getExecutionDescription().getHost().getType();
         ApplicationDeploymentDescriptionType app = invocationContext.getExecutionDescription().getApp().getType();
@@ -274,18 +262,13 @@ public class GramProvider extends Abstra
             if (hostgridFTP == null || hostgridFTP.length == 0) {
                 hostgridFTP = new String[] { host.getHostAddress() };
             }
-
             boolean success = false;
             ProviderException pe = new ProviderException("");
-
             for (String endpoint : host.getGridFTPEndPointArray()) {
-
                 try {
-
                     /*
-                     * Stdout and Stderror
+                     *  Read Stdout and Stderror
                      */
-
                     URI stdoutURI = GfacUtils.createGsiftpURI(endpoint, app.getStandardOutput());
                     URI stderrURI = GfacUtils.createGsiftpURI(endpoint, app.getStandardError());
 
@@ -297,7 +280,6 @@ public class GramProvider extends Abstra
                         logDir.mkdir();
                     }
 
-                    // Get the Stdouts and StdErrs
                     String timeStampedServiceName = GfacUtils.createUniqueNameForService(invocationContext
                             .getServiceName());
                     File localStdOutFile = File.createTempFile(timeStampedServiceName, "stdout");
@@ -305,40 +287,38 @@ public class GramProvider extends Abstra
 
                     String stdout = ftp.readRemoteFile(stdoutURI, gssCred, localStdOutFile);
                     String stderr = ftp.readRemoteFile(stderrURI, gssCred, localStdErrFile);
-                    Map<String,?> stringMap = null;
-                    // This is to handle exception during the output parsing.
-                    try{
-                         stringMap = OutputUtils.fillOutputFromStdout(invocationContext.<ActualParameter>getOutput(), stdout);
-                    }catch(Exception e){
-                        int errCode = listener.getError();
-                            String errorMsg = "Job " + job.getID() + " on host " + host.getHostAddress();
-                            JobSubmissionFault error = new JobSubmissionFault(this, new Exception(errorMsg), "GFAC HOST",
-                                    gateKeeper, job.getRSL());
-                            if (errCode == 8) {
-                                error.setReason(JobSubmissionFault.JOB_CANCEL);
-                            } else {
-                                error.setReason(JobSubmissionFault.JOB_FAILED + " With Null Output Value :");
-                            }
-                            invocationContext.getExecutionContext().getNotifier().executionFail(invocationContext,error,errorMsg);
-                            throw error;
-                    }
-                    MessageContext<Object> input = invocationContext.getOutput();
-                    for (Iterator<String> iterator = input.getNames(); iterator.hasNext(); ) {
+                    Map<String,ActualParameter> stringMap = null;
+                    MessageContext<Object> output = invocationContext.getOutput();
+                    for (Iterator<String> iterator = output.getNames(); iterator.hasNext(); ) {
                         String paramName = iterator.next();
-                        String paramValue = input.getStringValue(paramName);
-                        if("".equals(paramValue) || paramValue == null){
+                        ActualParameter actualParameter = (ActualParameter) output.getValue(paramName);
+						if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+							URI outputURI = GfacUtils.createGsiftpURI(endpoint,app.getOutputDataDirectory());
+							List<String> outputList = ftp.listDir(outputURI,gssCred);
+							String[] valueList = outputList.toArray(new String[outputList.size()]);
+							((URIArrayType) actualParameter.getType()).setValueArray(valueList);
+							stringMap = new HashMap<String, ActualParameter>();
+							stringMap.put(paramName, actualParameter);
+						}
+                    	else{
+                    	// This is to handle exception during the output parsing.
+                        stringMap = OutputUtils.fillOutputFromStdout(invocationContext.<ActualParameter>getOutput(), stdout);
+                        String paramValue = output.getStringValue(paramName);
+                        if(paramValue == null || paramValue.isEmpty()){
                             int errCode = listener.getError();
                             String errorMsg = "Job " + job.getID() + " on host " + host.getHostAddress();
                             JobSubmissionFault error = new JobSubmissionFault(this, new Exception(errorMsg), "GFAC HOST",
                                     gateKeeper, job.getRSL());
-                            if (errCode == 8) {
-                                error.setReason(JobSubmissionFault.JOB_CANCEL);
-                            } else {
-                                error.setReason(JobSubmissionFault.JOB_FAILED + " With Exit Code:" + job.getExitCode());
-                            }
+                            errorReason(errCode, error);
                             invocationContext.getExecutionContext().getNotifier().executionFail(invocationContext,error,errorMsg);
                             throw error;
                         }
+                        }
+                    }
+                    if(stringMap == null || stringMap.isEmpty()){
+                    	ProviderException exception = new ProviderException("Error creating job output");
+                    	 invocationContext.getExecutionContext().getNotifier().executionFail(invocationContext,exception,exception.getLocalizedMessage());
+                         throw exception;
                     }
                     // If users has given an output DAta poth we download the output files in to that directory, this will be apath in the machine where GFac is installed
                     if(WorkflowContextHeaderBuilder.getCurrentContextHeader()
@@ -353,7 +333,11 @@ public class GramProvider extends Abstra
                         }
                     }
                     return stringMap;
-                } catch (ToolsException e) {
+                }catch (XmlException e) {
+                    invocationContext.getExecutionContext().getNotifier().executionFail(invocationContext,e,e.getMessage());
+                    throw new ProviderException(e.getMessage(), e);
+                }
+                catch (ToolsException e) {
                     invocationContext.getExecutionContext().getNotifier().executionFail(invocationContext,e,e.getMessage());
                     throw new ProviderException(e.getMessage(), e);
                 } catch (URISyntaxException e) {