You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ms...@apache.org on 2015/06/23 15:10:38 UTC

airavata git commit: Trying to push outputs to registry so that PGA can show the outputs, still in progress..

Repository: airavata
Updated Branches:
  refs/heads/airavata-0.15-release-branch f18e34307 -> 1951bfd10


Trying to push outputs to registry so that PGA can show the outputs,
still in progress..

Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/1951bfd1
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/1951bfd1
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/1951bfd1

Branch: refs/heads/airavata-0.15-release-branch
Commit: 1951bfd107bb0811516415cddb720183ca6c92a4
Parents: f18e343
Author: msmemon <sh...@gmail.com>
Authored: Tue Jun 23 15:10:27 2015 +0200
Committer: msmemon <sh...@gmail.com>
Committed: Tue Jun 23 15:10:27 2015 +0200

----------------------------------------------------------------------
 .../gfac/bes/provider/impl/BESProvider.java     |   6 +-
 .../gfac/bes/utils/DataTransferrer.java         | 174 +++++++++----------
 2 files changed, 85 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/1951bfd1/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
index 3ed08cb..69e4915 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
@@ -49,6 +49,7 @@ import org.apache.airavata.model.messaging.event.JobIdentifier;
 import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
 import org.apache.airavata.model.workspace.experiment.JobDetails;
 import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.registry.cpi.ChildDataType;
 import org.apache.xmlbeans.XmlCursor;
 import org.bouncycastle.asn1.x500.style.BCStyle;
 import org.ggf.schemas.bes.x2006.x08.besFactory.ActivityStateEnumeration;
@@ -141,8 +142,6 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
             
             log.info("Submitted JSDL: " + jobDefinition.getJobDescription());
             
-            
-
             // upload files if any
             DataTransferrer dt = new DataTransferrer(jobExecutionContext, sc);
             dt.uploadLocalFiles();
@@ -228,7 +227,8 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
         } catch (Exception e) {
             log.error("Cannot create storage..");
             throw new GFacProviderException("Cannot create storage..", e);
-        } 
+        }
+        
     }
 	
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/1951bfd1/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
index 2dbf637..7de0a11 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
@@ -26,25 +26,21 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
-import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 import org.apache.airavata.gfac.Constants;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.provider.GFacProviderException;
-import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
 import org.apache.airavata.model.appcatalog.appinterface.DataType;
 import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
 import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
 import org.apache.airavata.model.workspace.experiment.TaskDetails;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.FilenameUtils;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,11 +54,42 @@ public class DataTransferrer {
 	
 	private StorageClient storageClient;
 	
+	private List<OutputDataObjectType> resultantOutputsLst;
+	
+	private String downloadLocation, stdoutLocation, stderrLocation;
+	
 	public DataTransferrer(JobExecutionContext jobContext, StorageClient storageClient) {
 		this.jobContext = jobContext;
 		this.storageClient = storageClient;
+		resultantOutputsLst = new ArrayList<OutputDataObjectType>();
+		initStdoutsLocation();
 	}
 	
+	private void initStdoutsLocation() {
+
+		downloadLocation = getDownloadLocation();
+		
+		String stdout = jobContext.getStandardOutput();
+		String stderr = jobContext.getStandardError();
+		if(stdout != null) {
+			stdout = stdout.substring(stdout.lastIndexOf('/')+1);
+		}
+		
+		if(stderr != null) {
+			stderr = stderr.substring(stderr.lastIndexOf('/')+1);
+		}
+		
+		String stdoutFileName = (stdout == null || stdout.equals("")) ? "stdout"
+				: stdout;
+		String stderrFileName = (stdout == null || stderr.equals("")) ? "stderr"
+				: stderr;
+		
+		stdoutLocation = downloadLocation+File.separator+stdoutFileName;
+		
+		stderrLocation = downloadLocation+File.separator+stderrFileName;
+
+		
+	}
 	
 	public void uploadLocalFiles() throws GFacProviderException {
 		List<String> inFilePrms = extractInFileParams();
@@ -86,43 +113,13 @@ public class DataTransferrer {
 		
 	}
 	
+
 	/**
 	 * This method will download all the remote files specified in the output 
 	 * context of a job.  
 	 * */
 	public void downloadRemoteFiles() throws GFacProviderException {
 		
-		String downloadLocation = getDownloadLocation();
-		
-		File file = new File(downloadLocation);
-		if(!file.exists()){
-			file.mkdirs();	
-		}
-
-//		Map<String, Object> output = jobContext.getOutMessageContext().getParameters();
-//        Set<String> keys = output.keySet();
-//        
-//		for (String outPrm : keys) {
-//			OutputDataObjectType actualParameter = (OutputDataObjectType) output.get(outPrm);
-//				if (DataType.STDERR == actualParameter.getType() ||
-//					DataType.STDOUT == actualParameter.getType() ||
-//					DataType.URI == actualParameter.getType())  {
-//					continue;
-//				}
-//				
-//				String value = actualParameter.getValue();
-//				FileDownloader fileDownloader = new FileDownloader(value,downloadLocation, Mode.overwrite);
-//				try {
-//					fileDownloader.perform(storageClient);
-//					String outputPath = downloadLocation + File.separator + value.substring(value.lastIndexOf('/')+1);
-//					actualParameter.setValue(outputPath);
-//					actualParameter.setType(DataType.URI);
-//					jobContext.addOutputFile(outputPath);
-//				} catch (Exception e) {
-//					throw new GFacProviderException(e.getLocalizedMessage(),e);
-//				}
-//		}
-		
 		if(log.isDebugEnabled()) {
 			log.debug("Download location is:"+downloadLocation);
 		}
@@ -136,48 +133,36 @@ public class DataTransferrer {
 
 	           	if(output.getType().equals(DataType.STRING)) {
 					String value = output.getValue();
-					FileDownloader fileDownloader = new FileDownloader(value,downloadLocation, Mode.overwrite);
+					String outputPath = downloadLocation + File.separator + value;
+					FileDownloader fileDownloader = new FileDownloader(value,outputPath, Mode.overwrite);
 					try {
 						fileDownloader.perform(storageClient);
-						String outputPath = downloadLocation + File.separator + value;
-						jobContext.addOutputFile(outputPath);
 					} catch (Exception e) {
-						log.error("Cannot download remote files..");
+						log.error("Error downloading remote files..");
 						throw new GFacProviderException(e.getLocalizedMessage(),e);
 					}
+					resultantOutputsLst.add(output);
+					jobContext.addOutputFile(outputPath);
+	           	}
+	           	
+	           	if(output.getType().equals(DataType.STDOUT)) {
+	           		resultantOutputsLst.add(output);
+	           	}
+	           	
+	           	if(output.getType().equals(DataType.STDERR)) {
+	           		resultantOutputsLst.add(output);
 	           	}
             }
 		 }
-		
 		downloadStdOuts();
 	}
 	
-	
 	public void downloadStdOuts()  throws GFacProviderException{
-		String downloadLocation = getDownloadLocation();
-		File file = new File(downloadLocation);
-		if(!file.exists()){
-			file.mkdirs();	
-		}
 		
-		String stdout = jobContext.getStandardOutput();
-		String stderr = jobContext.getStandardError();
-		if(stdout != null) {
-			stdout = stdout.substring(stdout.lastIndexOf('/')+1);
-		}
+		String stdoutFileName = new File(stdoutLocation).getName();
 		
-		if(stderr != null) {
-			stderr = stderr.substring(stderr.lastIndexOf('/')+1);
-		}
+		String stderrFileName = new File(stderrLocation).getName();
 		
-		String stdoutFileName = (stdout == null || stdout.equals("")) ? "stdout"
-				: stdout;
-		String stderrFileName = (stdout == null || stderr.equals("")) ? "stderr"
-				: stderr;
-		
-		ApplicationDeploymentDescription application = jobContext.getApplicationContext().getApplicationDeploymentDescription();
-		
-		String stdoutLocation = downloadLocation+File.separator+stdoutFileName;
 		FileDownloader f1 = new FileDownloader(stdoutFileName,stdoutLocation, Mode.overwrite);
 		try {
 			f1.perform(storageClient);
@@ -185,47 +170,45 @@ public class DataTransferrer {
 			String stdoutput = readFile(stdoutLocation);
 			jobContext.addOutputFile(stdoutLocation);
 			jobContext.setStandardOutput(stdoutLocation);
-			log.info("Stdout downloaded to -> "+stdoutLocation);
+			log.info(stdoutFileName + " -> "+stdoutLocation);
 			if(UASDataStagingProcessor.isUnicoreEndpoint(jobContext)) {
 				String scriptExitCodeFName = "UNICORE_SCRIPT_EXIT_CODE";
 				String scriptCodeLocation = downloadLocation+File.separator+scriptExitCodeFName;
 				f1.setFrom(scriptExitCodeFName);
 				f1.setTo(scriptCodeLocation);
 				f1.perform(storageClient);
-				log.info("UNICORE_SCRIPT_EXIT_CODE downloaded to "+scriptCodeLocation);
+				jobContext.addOutputFile(scriptCodeLocation);
+				log.info("UNICORE_SCRIPT_EXIT_CODE -> "+scriptCodeLocation);
 			}
-			String stderrLocation = downloadLocation+File.separator+stderrFileName;
+			
 			f1.setFrom(stderrFileName);
 			f1.setTo(stderrLocation);
 			f1.perform(storageClient);
 			String stderror = readFile(stderrLocation);
 			jobContext.addOutputFile(stderrLocation);
 			jobContext.setStandardError(stderrLocation);
-			log.info("Stderr downloaded to -> "+stderrLocation);
+			log.info(stderrFileName + " -> " + stderrLocation);
 		} catch (Exception e) {
-			
 			throw new GFacProviderException(e.getLocalizedMessage(),e);
 		}
 		
+		publishFinalOutputs();
 	}
 	
-	public List<String> extractOutParams(JobExecutionContext context) {
-		List<String> outPrmsList = new ArrayList<String>();
-		List<OutputDataObjectType> applicationOutputs = jobContext.getTaskData().getApplicationOutputs();
-		 if (applicationOutputs != null && !applicationOutputs.isEmpty()){
-           for (OutputDataObjectType output : applicationOutputs){
-          	 if(output.getType().equals(DataType.STRING)) {
-          		outPrmsList.add(output.getValue());
-          	 }
-          	 else if(output.getType().equals(DataType.FLOAT) || output.getType().equals(DataType.INTEGER)) {
-          		outPrmsList.add(String.valueOf(output.getValue()));
-          		 
-          	 }
-           }
-		 }
-		return outPrmsList;
+	protected void publishFinalOutputs() throws GFacProviderException {
+        try {
+        	if(!resultantOutputsLst.isEmpty()) { 
+	        	Registry registry = jobContext.getRegistry();
+				registry.add(ChildDataType.EXPERIMENT_OUTPUT, resultantOutputsLst, jobContext.getExperimentID());
+        	}
+		} catch (RegistryException e) {
+			throw new GFacProviderException("Cannot publish outputs to the registry.");
+		}
+
+		
 	}
 	
+	
 	public List<String> extractInFileParams() {
 		List<String> filePrmsList = new ArrayList<String>();
 		List<InputDataObjectType> applicationInputs = jobContext.getTaskData().getApplicationInputs();
@@ -274,11 +257,6 @@ public class DataTransferrer {
 
 			else {
 				
-				try {
-					URI u = new URI(outputDataDir);
-				} catch (URISyntaxException e) {
-					return getTempPath(jobContext.getExperimentID());
-				}
 				// in case of remote locations use the tmp location
 				if (outputDataDir.startsWith("scp:") || 
 						outputDataDir.startsWith("ftp:") ||
@@ -289,12 +267,24 @@ public class DataTransferrer {
 							outputDataDir = getTempPath(jobContext.getExperimentID());
 					
 				} else {
-					outputDataDir = taskData.getAdvancedOutputDataHandling()
-							.getOutputDataDir();
+					try {
+						URI u = new URI(outputDataDir);
+						outputDataDir = u.getPath();
+					} catch (URISyntaxException e) {
+						outputDataDir = getTempPath(jobContext.getExperimentID());
+					}
+
+					
 				}
 			}
 		}
+		
+		File file = new File(outputDataDir);
+		if(!file.exists()){
+			file.mkdirs();	
+		}
 
+		
 		return outputDataDir;
 	}