You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ms...@apache.org on 2015/06/30 15:50:37 UTC

airavata git commit: some data staging modifications to seamlessly download and display job output files

Repository: airavata
Updated Branches:
  refs/heads/airavata-0.15-release-branch 2bab70875 -> a080ff7b2


some data staging modifications to seamlessly download and display job
output files

Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a080ff7b
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a080ff7b
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a080ff7b

Branch: refs/heads/airavata-0.15-release-branch
Commit: a080ff7b2fcada5497a84a9effdff19570c24945
Parents: 2bab708
Author: msmemon <sh...@gmail.com>
Authored: Tue Jun 30 15:50:26 2015 +0200
Committer: msmemon <sh...@gmail.com>
Committed: Tue Jun 30 15:50:26 2015 +0200

----------------------------------------------------------------------
 .../gfac/bes/provider/impl/BESProvider.java     |   7 +-
 .../bes/security/UNICORESecurityContext.java    |  14 ++-
 .../gfac/bes/utils/DataTransferrer.java         | 105 +++++++++++--------
 .../gfac/bes/utils/UASDataStagingProcessor.java |  23 +++-
 4 files changed, 93 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/a080ff7b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
index 69e4915..97cf2d3 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
@@ -109,7 +109,7 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
 			log.debug("Security properties initialized.");
 		}
 	}
-
+	
 	public void execute(JobExecutionContext jobExecutionContext)
 			throws GFacProviderException, GFacException {
 
@@ -182,6 +182,7 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
             log.info(formatStatusMessage(activityEpr.getAddress().getStringValue(), activityStatus.getState().toString()));
             ActivityClient activityClient;
             activityClient = new ActivityClient(activityEpr, secProperties);
+            // now use the activity working directory property
             dt.setStorageClient(activityClient.getUspaceClient());
 
             if ((activityStatus.getState() == ActivityStateEnumeration.FAILED)) {
@@ -191,7 +192,7 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
                         + activityStatus.getFault().getFaultstring()
                         + "\n EXITCODE: " + activityStatus.getExitCode();
                 log.info(error);
-  
+                
                 JobState applicationJobStatus = JobState.FAILED;
                 sendNotification(jobExecutionContext,applicationJobStatus);
                 GFacUtils.updateJobStatus(jobExecutionContext, jobDetails, applicationJobStatus);
@@ -221,6 +222,8 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
                     dt.downloadStdOuts();
                 }
             }
+            
+            dt.publishFinalOutputs();
         } catch (AppCatalogException e) {
             log.error("Error while retrieving UNICORE job submission..");
             throw new GFacProviderException("Error while retrieving UNICORE job submission..", e);

http://git-wip-us.apache.org/repos/asf/airavata/blob/a080ff7b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/UNICORESecurityContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/UNICORESecurityContext.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/UNICORESecurityContext.java
index e82beeb..20c493f 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/UNICORESecurityContext.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/UNICORESecurityContext.java
@@ -39,6 +39,7 @@ import org.bouncycastle.asn1.x500.style.BCStyle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import de.fzj.unicore.uas.security.ProxyCertOutHandler;
 import eu.emi.security.authn.x509.X509Credential;
 import eu.emi.security.authn.x509.impl.KeyAndCertCredential;
 import eu.emi.security.authn.x509.impl.X500NameUtils;
@@ -73,11 +74,7 @@ public class UNICORESecurityContext extends X509SecurityContext {
 		catch (Exception e) {
 			throw new GFacException(e.getMessage(), e); 
 		} 
-		secProperties.getETDSettings().setExtendTrustDelegation(true);
 		if(enableMessageLogging) secProperties.setMessageLogging(true);
-//		secProperties.setMessageLogging(true);
-//		secProperties.setDoSignMessage(true);
-		secProperties.getETDSettings().setIssuerCertificateChain(secProperties.getCredential().getCertificateChain());
 		
 		return secProperties;
 	}
@@ -180,18 +177,19 @@ public class UNICORESecurityContext extends X509SecurityContext {
 		p.setProperty("http.connection.timeout", "5000");
 		p.setProperty("http.socket.timeout", "5000");
 		
-		secProperties.setExtraSettings(p);
-
 		if (outHandlers == null) {
 			outHandlerLst = new HashSet<String>();
 		} else {
 			outHandlerLst = new HashSet<String>(Arrays.asList(outHandlers));
 		}
 
-		outHandlerLst.add("de.fzj.unicore.uas.security.ProxyCertOutHandler");
-
+		outHandlerLst.add(ProxyCertOutHandler.class.getName());
+		
 		secProperties.setOutHandlerClassNames(outHandlerLst
 				.toArray(new String[outHandlerLst.size()]));
+		
+		secProperties.getETDSettings().setExtendTrustDelegation(true);
+
 	}
 
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/a080ff7b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
index 7de0a11..0c6b7a2 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
@@ -46,17 +46,21 @@ import org.slf4j.LoggerFactory;
 
 import de.fzj.unicore.uas.client.StorageClient;
 
-
+/**
+ * Data movement utility class for transferring files before and after the job execution phase.   
+ * 
+ * */
 public class DataTransferrer {
-    protected final Logger log = LoggerFactory.getLogger(this.getClass());
+   
+	protected final Logger log = LoggerFactory.getLogger(this.getClass());
 
-	private JobExecutionContext jobContext;
+	protected JobExecutionContext jobContext;
 	
-	private StorageClient storageClient;
+	protected StorageClient storageClient;
 	
-	private List<OutputDataObjectType> resultantOutputsLst;
+	protected List<OutputDataObjectType> resultantOutputsLst;
 	
-	private String downloadLocation, stdoutLocation, stderrLocation;
+	protected String gatewayDownloadLocation, stdoutLocation, stderrLocation;
 	
 	public DataTransferrer(JobExecutionContext jobContext, StorageClient storageClient) {
 		this.jobContext = jobContext;
@@ -67,7 +71,7 @@ public class DataTransferrer {
 	
 	private void initStdoutsLocation() {
 
-		downloadLocation = getDownloadLocation();
+		gatewayDownloadLocation = getDownloadLocation();
 		
 		String stdout = jobContext.getStandardOutput();
 		String stderr = jobContext.getStandardError();
@@ -84,10 +88,14 @@ public class DataTransferrer {
 		String stderrFileName = (stdout == null || stderr.equals("")) ? "stderr"
 				: stderr;
 		
-		stdoutLocation = downloadLocation+File.separator+stdoutFileName;
+		stdoutLocation = gatewayDownloadLocation+File.separator+stdoutFileName;
 		
-		stderrLocation = downloadLocation+File.separator+stderrFileName;
-
+		stderrLocation = gatewayDownloadLocation+File.separator+stderrFileName;
+		
+		jobContext.addOutputFile(stdoutLocation);
+		jobContext.setStandardOutput(stdoutLocation);
+		jobContext.addOutputFile(stderrLocation);
+		jobContext.setStandardError(stderrLocation);
 		
 	}
 	
@@ -121,7 +129,7 @@ public class DataTransferrer {
 	public void downloadRemoteFiles() throws GFacProviderException {
 		
 		if(log.isDebugEnabled()) {
-			log.debug("Download location is:"+downloadLocation);
+			log.debug("Download location is:"+gatewayDownloadLocation);
 		}
 		
 		List<OutputDataObjectType> applicationOutputs = jobContext.getTaskData().getApplicationOutputs();
@@ -130,31 +138,37 @@ public class DataTransferrer {
 				if("".equals(output.getValue()) || output.getValue() == null) {
 					continue;
 				}
-
-	           	if(output.getType().equals(DataType.STRING)) {
+	           	if(output.getType().equals(DataType.STDOUT)) {
+	           		output.setValue(jobContext.getStandardOutput());
+	           		resultantOutputsLst.add(output);
+	           	}
+	           	
+	           	else if(output.getType().equals(DataType.STDERR)) {
+	           		output.setValue(jobContext.getStandardError());
+	           		resultantOutputsLst.add(output);
+	           	}
+				else if(output.getType().equals(DataType.STRING)) {
 					String value = output.getValue();
-					String outputPath = downloadLocation + File.separator + value;
+					String outputPath = gatewayDownloadLocation + File.separator + value;
 					FileDownloader fileDownloader = new FileDownloader(value,outputPath, Mode.overwrite);
 					try {
 						fileDownloader.perform(storageClient);
+						output.setType(DataType.URI);
+						output.setValue(outputPath);
+						jobContext.addOutputFile(outputPath);
+						resultantOutputsLst.add(output);
 					} catch (Exception e) {
-						log.error("Error downloading remote files..");
+						log.error("Error downloading "+value+" from job working directory. ");
 						throw new GFacProviderException(e.getLocalizedMessage(),e);
 					}
-					resultantOutputsLst.add(output);
-					jobContext.addOutputFile(outputPath);
 	           	}
 	           	
-	           	if(output.getType().equals(DataType.STDOUT)) {
-	           		resultantOutputsLst.add(output);
-	           	}
-	           	
-	           	if(output.getType().equals(DataType.STDERR)) {
-	           		resultantOutputsLst.add(output);
-	           	}
             }
+            
 		 }
-		downloadStdOuts();
+		 
+		 downloadStdOuts();
+
 	}
 	
 	public void downloadStdOuts()  throws GFacProviderException{
@@ -163,41 +177,48 @@ public class DataTransferrer {
 		
 		String stderrFileName = new File(stderrLocation).getName();
 		
-		FileDownloader f1 = new FileDownloader(stdoutFileName,stdoutLocation, Mode.overwrite);
+		FileDownloader f1 = null;  
 		try {
-			f1.perform(storageClient);
 			log.info("Downloading stdout and stderr..");
-			String stdoutput = readFile(stdoutLocation);
-			jobContext.addOutputFile(stdoutLocation);
-			jobContext.setStandardOutput(stdoutLocation);
 			log.info(stdoutFileName + " -> "+stdoutLocation);
+			
+			f1 = new FileDownloader(stdoutFileName,stdoutLocation, Mode.overwrite);
+			f1.perform(storageClient);
+			String stdoutput = readFile(stdoutLocation);
+
+			log.info(stderrFileName + " -> " + stderrLocation);
+			f1.setFrom(stderrFileName);
+			f1.setTo(stderrLocation);
+			f1.perform(storageClient);
+			String stderror = readFile(stderrLocation);
+
 			if(UASDataStagingProcessor.isUnicoreEndpoint(jobContext)) {
 				String scriptExitCodeFName = "UNICORE_SCRIPT_EXIT_CODE";
-				String scriptCodeLocation = downloadLocation+File.separator+scriptExitCodeFName;
+				String scriptCodeLocation = gatewayDownloadLocation+File.separator+scriptExitCodeFName;
 				f1.setFrom(scriptExitCodeFName);
 				f1.setTo(scriptCodeLocation);
 				f1.perform(storageClient);
 				jobContext.addOutputFile(scriptCodeLocation);
 				log.info("UNICORE_SCRIPT_EXIT_CODE -> "+scriptCodeLocation);
+				log.info("EXIT CODE: "+ readFile(scriptCodeLocation)); 
 			}
-			
-			f1.setFrom(stderrFileName);
-			f1.setTo(stderrLocation);
-			f1.perform(storageClient);
-			String stderror = readFile(stderrLocation);
-			jobContext.addOutputFile(stderrLocation);
-			jobContext.setStandardError(stderrLocation);
-			log.info(stderrFileName + " -> " + stderrLocation);
 		} catch (Exception e) {
 			throw new GFacProviderException(e.getLocalizedMessage(),e);
 		}
 		
-		publishFinalOutputs();
 	}
 	
-	protected void publishFinalOutputs() throws GFacProviderException {
+	/**
+	 * This method should be called once all the output files are successfully 
+	 * transferred from the remote Unicore endpoint.
+	 * Its access is made public to give clients a room for 
+	 * graceful invocation after the required outputs are downloaded. 
+	 * 
+	 * */
+	public void publishFinalOutputs() throws GFacProviderException {
         try {
-        	if(!resultantOutputsLst.isEmpty()) { 
+        	if(!resultantOutputsLst.isEmpty()) {
+        		log.debug("Publishing the list of outputs to the registry instance..");
 	        	Registry registry = jobContext.getRegistry();
 				registry.add(ChildDataType.EXPERIMENT_OUTPUT, resultantOutputsLst, jobContext.getExperimentID());
         	}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a080ff7b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/UASDataStagingProcessor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/UASDataStagingProcessor.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/UASDataStagingProcessor.java
index 5de9593..0b6d62c 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/UASDataStagingProcessor.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/UASDataStagingProcessor.java
@@ -23,15 +23,21 @@ package org.apache.airavata.gfac.bes.utils;
 
 import java.io.File;
 import java.util.List;
+
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.model.appcatalog.appinterface.DataType;
 import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
 import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
 import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
 import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class UASDataStagingProcessor {
 	
+	protected final Logger log = LoggerFactory.getLogger(this.getClass());
+
+	
 	public static void generateDataStagingElements(JobDefinitionType value, JobExecutionContext context, String smsUrl) throws Exception{
 		smsUrl = "BFT:"+smsUrl;
 		
@@ -44,10 +50,11 @@ public class UASDataStagingProcessor {
 		}
 	}
 	
-	private static void createInURISMSElement(JobDefinitionType value, String smsUrl, String uri)
+	private static void createInURISMSElement(JobDefinitionType value, String smsUrl, String uri, boolean useSMS)
 			throws Exception {
 		String fileName = new File(uri).getName();
-		if (uri.startsWith("file")) {
+		
+		if (useSMS && uri.startsWith("file:")) {
 			uri = smsUrl+"#/"+fileName;
 			
 		} 
@@ -136,9 +143,17 @@ public class UASDataStagingProcessor {
 					continue;
 				}
 				if(input.getType().equals(DataType.URI)){
-					createInURISMSElement(value, smsUrl, input.getValue());
+					createInURISMSElement(value, smsUrl, input.getValue(), false);
+				}
+				else if(input.getType().equals(DataType.STRING) && input.isDataStaged()){
+					System.out.println("INPUT DATA VALUE-SBZ: "+input.getValue());
+					System.out.println("INPUT DATA flag-SBZ: "+input.isDataStaged());
+
+					createInURISMSElement(value, smsUrl, input.getValue(), false);
 				}
-				else if(input.getType().equals(DataType.STRING)){
+				else if(input.getType().equals(DataType.STRING) && !input.isDataStaged()){
+					System.out.println("INPUT DATA value for arg-SBZ: "+input.getValue());
+					System.out.println("INPUT DATA flag for arg-SBZ: "+input.isDataStaged());
 					ApplicationProcessor.addApplicationArgument(value, context, input.getValue());
 				}
 				else if (input.getType().equals(DataType.FLOAT) || input.getType().equals(DataType.INTEGER)){