You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ra...@apache.org on 2013/12/13 17:22:02 UTC
svn commit: r1550762 - in
/airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac:
notification/events/ provider/utils/
Author: raminder
Date: Fri Dec 13 16:22:01 2013
New Revision: 1550762
URL: http://svn.apache.org/r1550762
Log:
updated to fix AIRAVATA-965
Added:
airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/notification/events/UnicoreJobIDEvent.java
airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/DataTransferrer.java
airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/FileDownloader.java
airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/FileTransferBase.java
airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/FileUploader.java
airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/Mode.java
airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/StorageCreator.java
airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/UASDataStagingProcessor.java
Added: airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/notification/events/UnicoreJobIDEvent.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/notification/events/UnicoreJobIDEvent.java?rev=1550762&view=auto
==============================================================================
--- airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/notification/events/UnicoreJobIDEvent.java (added)
+++ airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/notification/events/UnicoreJobIDEvent.java Fri Dec 13 16:22:01 2013
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.notification.events;
+
+public class UnicoreJobIDEvent extends GFacEvent {
+ String statusMessage;
+
+ public UnicoreJobIDEvent(String message) {
+ statusMessage = message;
+ this.eventType = UnicoreJobIDEvent.class.getSimpleName();
+ }
+
+ public String getStatusMessage() {
+ return statusMessage;
+ }
+}
Added: airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/DataTransferrer.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/DataTransferrer.java?rev=1550762&view=auto
==============================================================================
--- airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/DataTransferrer.java (added)
+++ airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/DataTransferrer.java Fri Dec 13 16:22:01 2013
@@ -0,0 +1,230 @@
+package org.apache.airavata.gfac.provider.utils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.ApplicationDescription;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
+import org.apache.airavata.schemas.gfac.StringArrayType;
+import org.apache.airavata.schemas.gfac.StringParameterType;
+import org.apache.airavata.schemas.gfac.URIParameterType;
+import org.apache.airavata.schemas.wec.ApplicationOutputDataHandlingDocument.ApplicationOutputDataHandling;
+import org.apache.airavata.schemas.wec.ContextHeaderDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import de.fzj.unicore.uas.client.StorageClient;
+
+
+public class DataTransferrer {
+ protected final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ private JobExecutionContext jobContext;
+
+ private StorageClient storageClient;
+
+ public DataTransferrer(JobExecutionContext jobContext, StorageClient storageClient) {
+ this.jobContext = jobContext;
+ this.storageClient = storageClient;
+ }
+
+
+ public void uploadLocalFiles() throws GFacProviderException {
+ Map<String, Object> inputParams = jobContext.getInMessageContext()
+ .getParameters();
+ for (String paramKey : inputParams.keySet()) {
+ ActualParameter inParam = (ActualParameter) inputParams
+ .get(paramKey);
+ String paramDataType = inParam.getType().getType().toString();
+ if("URI".equals(paramDataType)) {
+ String uri = ((URIParameterType) inParam.getType()).getValue();
+ String fileName = new File(uri).getName();
+ if (uri.startsWith("file")) {
+ try {
+ String uriWithoutProtocol = uri.substring(
+ uri.lastIndexOf("://") + 1, uri.length());
+ FileUploader fileUploader = new FileUploader(
+ uriWithoutProtocol, "input/" + fileName,
+ Mode.overwrite);
+ fileUploader.perform(storageClient);
+ } catch (FileNotFoundException e3) {
+ throw new GFacProviderException(
+ "Error while staging-in, local file "+fileName+" not found", e3);
+ } catch (Exception e) {
+ throw new GFacProviderException("Cannot upload files", e);
+
+ }
+
+ }
+ }
+ }
+
+ }
+
+ /**
+ * This method will download all the remote files specified according to the output
+ * context of a job.
+ * */
+ public void downloadRemoteFiles() throws GFacProviderException {
+
+ String downloadLocation = getDownloadLocation();
+
+ File file = new File(downloadLocation);
+ if(!file.exists()){
+ file.mkdirs();
+ }
+
+ Map<String, ActualParameter> stringMap = new HashMap<String, ActualParameter>();
+
+ Map<String, Object> outputParams = jobContext.getOutMessageContext()
+ .getParameters();
+
+ for (String paramKey : outputParams.keySet()) {
+
+ ActualParameter outParam = (ActualParameter) outputParams
+ .get(paramKey);
+
+ // if single urls then convert each url into jsdl source
+ // elements,
+ // that are formed by concat of gridftpurl+inputdir+filename
+
+ String paramDataType = outParam.getType().getType().toString();
+
+ if ("String".equals(paramDataType)) {
+ String stringPrm = ((StringParameterType) outParam
+ .getType()).getValue();
+ String localFileName = null;
+ //TODO: why analysis.tar? it wont scale to gateways..
+ if(stringPrm == null || stringPrm.isEmpty()){
+ localFileName = "analysis-results.tar";
+ }else{
+ localFileName = stringPrm.substring(stringPrm.lastIndexOf("/")+1);
+ }
+ String outputLocation = downloadLocation+File.separator+localFileName;
+ FileDownloader fileDownloader = new FileDownloader("output/"+stringPrm,outputLocation, Mode.overwrite);
+ try {
+ fileDownloader.perform(storageClient);
+ ((StringParameterType) outParam.getType()).setValue(outputLocation);
+ stringMap.put(paramKey, outParam);
+ } catch (Exception e) {
+ throw new GFacProviderException(e.getLocalizedMessage(),e);
+ }
+ }
+
+ else if ("StringArray".equals(paramDataType)) {
+ String[] valueArray = ((StringArrayType) outParam.getType())
+ .getValueArray();
+ for (String v : valueArray) {
+ String localFileName = v.substring(v.lastIndexOf("/")+1);;
+ String outputLocation = downloadLocation+File.separator+localFileName;
+ FileDownloader fileDownloader = new FileDownloader("output/"+v,outputLocation, Mode.overwrite);
+ try {
+ fileDownloader.perform(storageClient);
+ ((StringParameterType) outParam.getType()).setValue(outputLocation);
+ stringMap.put(paramKey, outParam);
+ } catch (Exception e) {
+ throw new GFacProviderException(e.getLocalizedMessage(),e);
+ }
+ }
+ }
+ }
+ if (stringMap == null || stringMap.isEmpty()) {
+ throw new GFacProviderException("Empty Output returned from the Application, Double check the application" +
+ "and ApplicationDescriptor output Parameter Names");
+ }
+
+ downloadStdOuts();
+ }
+
+
+ public void downloadStdOuts() throws GFacProviderException{
+ String downloadLocation = getDownloadLocation();
+ File file = new File(downloadLocation);
+ if(!file.exists()){
+ file.mkdirs();
+ }
+
+ HpcApplicationDeploymentType appDepType = (HpcApplicationDeploymentType) jobContext
+ .getApplicationContext().getApplicationDeploymentDescription()
+ .getType();
+
+ String stdout = appDepType.getStandardOutput();
+ String stderr = appDepType.getStandardError();
+ if(stdout != null) {
+ stdout = stdout.substring(stdout.lastIndexOf('/')+1);
+ }
+
+ if(stderr != null) {
+ stderr = stderr.substring(stderr.lastIndexOf('/')+1);
+ }
+
+ String stdoutFileName = (stdout == null || stdout.equals("")) ? "stdout"
+ : stdout;
+ String stderrFileName = (stdout == null || stderr.equals("")) ? "stderr"
+ : stderr;
+
+ ApplicationDescription application = jobContext.getApplicationContext().getApplicationDeploymentDescription();
+ ApplicationDeploymentDescriptionType appDesc = application.getType();
+
+ String stdoutLocation = downloadLocation+File.separator+stdoutFileName;
+ FileDownloader f1 = new FileDownloader("output/"+stdoutFileName,stdoutLocation, Mode.overwrite);
+ try {
+ f1.perform(storageClient);
+ String stdoutput = readFile(stdoutLocation);
+ appDesc.setStandardOutput(stdoutput);
+ } catch (Exception e) {
+ throw new GFacProviderException(e.getLocalizedMessage(),e);
+ }
+ String stderrLocation = downloadLocation+File.separator+stderrFileName;
+ FileDownloader f2 = new FileDownloader("output/"+stderrFileName,stderrLocation, Mode.overwrite);
+ try {
+ f2.perform(storageClient);
+ String stderror = readFile(stderrLocation);
+ appDesc.setStandardError(stderror);
+ } catch (Exception e) {
+ throw new GFacProviderException(e.getLocalizedMessage(),e);
+ }
+ }
+
+ private String readFile(String localFile) throws IOException {
+ BufferedReader instream = new BufferedReader(new FileReader(localFile));
+ StringBuffer buff = new StringBuffer();
+ String temp = null;
+ while ((temp = instream.readLine()) != null) {
+ buff.append(temp);
+ buff.append(Constants.NEWLINE);
+ }
+
+ log.info("finish read file:" + localFile);
+
+ return buff.toString();
+ }
+
+ private String getDownloadLocation() {
+ String outputDataDirectory = null;
+ ContextHeaderDocument.ContextHeader currentContextHeader = jobContext
+ .getContextHeader();
+ if (currentContextHeader != null
+ && currentContextHeader.getWorkflowOutputDataHandling() != null) {
+ ApplicationOutputDataHandling[] handlings = currentContextHeader
+ .getWorkflowOutputDataHandling()
+ .getApplicationOutputDataHandlingArray();
+ if (handlings != null && handlings.length != 0) {
+ outputDataDirectory = handlings[0]
+ .getOutputDataDirectory();
+ return outputDataDirectory;
+ }
+ }
+ return outputDataDirectory;
+ }
+}
\ No newline at end of file
Added: airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/FileDownloader.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/FileDownloader.java?rev=1550762&view=auto
==============================================================================
--- airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/FileDownloader.java (added)
+++ airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/FileDownloader.java Fri Dec 13 16:22:01 2013
@@ -0,0 +1,235 @@
+package org.apache.airavata.gfac.provider.utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+import org.unigrids.services.atomic.types.GridFileType;
+import org.unigrids.services.atomic.types.ProtocolType;
+
+import de.fzj.unicore.uas.client.FileTransferClient;
+import de.fzj.unicore.uas.client.StorageClient;
+import de.fzj.unicore.uas.client.UFTPConstants;
+import de.fzj.unicore.uas.client.UFTPFileTransferClient;
+import de.fzj.unicore.uas.fts.FiletransferOptions.IMonitorable;
+import de.fzj.unicore.uas.fts.FiletransferOptions.SupportsPartialRead;
+
+/**
+ * helper that exports remote files from a UNICORE Storage
+ * to the local client machine.<br/>
+ * Simple wildcards ("*" and "?") and download of
+ * directories are supported.
+ *
+ * TODO this should be refactored so the single-file download logic
+ * is separated from the wildcard/directory/provided outputStream logic
+ *
+ * @author schuller
+ */
+public class FileDownloader extends FileTransferBase{
+
+ private boolean showProgress=true;
+
+ private boolean forceFileOnly=false;
+
+ private OutputStream targetStream=null;
+
+ public FileDownloader(String from, String to, Mode mode){
+ this(from,to,mode,true);
+ }
+
+ public FileDownloader(String from, String to, Mode mode, boolean failOnError){
+ this.to=to;
+ this.from=from;
+ this.mode=mode;
+ this.failOnError=failOnError;
+ }
+
+ public void perform(StorageClient sms)throws Exception{
+ boolean isWildcard=hasWildCards(from);
+ boolean isDirectory=false;
+ GridFileType gridSource=null;
+ if(isWildcard){
+ performWildCardExport(sms);
+ }
+ else {
+ //check if source is a directory
+ gridSource=sms.listProperties(from);
+ isDirectory=gridSource.getIsDirectory();
+ if(isDirectory){
+ if(forceFileOnly){
+ throw new IOException("Source is a directory");
+ }
+ performDirectoryExport(gridSource, new File(to), sms);
+ }
+ else{
+ download(gridSource,new File(to),sms);
+ }
+ }
+ }
+
+ protected void performDirectoryExport(GridFileType directory, File targetDirectory, StorageClient sms)throws Exception{
+ if(!targetDirectory.exists()|| !targetDirectory.canWrite()){
+ throw new IOException("Target directory <"+to+"> does not exist or is not writable!");
+ }
+ if(!targetDirectory.isDirectory()){
+ throw new IOException("Target <"+to+"> is not a directory!");
+ }
+ GridFileType[]gridFiles=sms.listDirectory(directory.getPath());
+ for(GridFileType file: gridFiles){
+ if(file.getIsDirectory()){
+ if(!recurse) {
+ System.out.println("Skipping directory "+file.getPath());
+ continue;
+ }
+ else{
+ File newTargetDirectory=new File(targetDirectory,getName(file.getPath()));
+ boolean success=newTargetDirectory.mkdirs();
+ if(!success)throw new IOException("Can create directory: "+newTargetDirectory.getAbsolutePath());
+ performDirectoryExport(file, newTargetDirectory, sms);
+ continue;
+ }
+ }
+ download(file, new File(targetDirectory,getName(file.getPath())), sms);
+ }
+ }
+
+ protected void performWildCardExport(StorageClient sms)throws Exception{
+ String dir=getDir(from);
+ if(dir==null)dir="/";
+ GridFileType[] files=sms.find(dir, false, from, false, null, null);
+ File targetDir=targetStream==null?new File(to):null;
+ if(targetStream==null){
+ if(!targetDir.isDirectory())throw new IOException("Target is not a directory.");
+ }
+ for(GridFileType f: files){
+ download(f, targetDir, sms);
+ }
+ }
+
+ private String getDir(String path){
+ return new File(path).getParent();
+ }
+
+ private String getName(String path){
+ return new File(path).getName();
+ }
+
+ /**
+ * download a single regular file
+ *
+ * @param source - grid file descriptor
+ * @param localFile - local file or directory to write to
+ * @param sms
+ * @throws Exception
+ */
+ private void download(GridFileType source, File localFile, StorageClient sms)throws Exception{
+ if(source==null || source.getIsDirectory()){
+ throw new IllegalStateException("Source="+source);
+ }
+
+ OutputStream os=targetStream!=null?targetStream:null;
+ FileTransferClient ftc=null;
+ try{
+ String path=source.getPath();
+ if(targetStream==null){
+ if(localFile.isDirectory()){
+ localFile=new File(localFile,getName(source.getPath()));
+ }
+ if(mode.equals(Mode.nooverwrite) && localFile.exists()){
+ System.out.println("File exists and creation mode was set to 'nooverwrite'.");
+ return;
+ }
+ System.out.println("Downloading remote file '"+sms.getUrl()+"#/"+path+"' -> "+localFile.getAbsolutePath());
+ os=new FileOutputStream(localFile.getAbsolutePath(), mode.equals(Mode.append));
+ }
+
+ chosenProtocol=sms.findSupportedProtocol(preferredProtocols.toArray(new ProtocolType.Enum[preferredProtocols.size()]));
+ Map<String,String>extraParameters=makeExtraParameters(chosenProtocol);
+ ftc=sms.getExport(path,extraParameters,chosenProtocol);
+ configure(ftc, extraParameters);
+ System.out.println("DEB:File transfer URL : "+ftc.getUrl());
+// ProgressBar p=null;
+ if(ftc instanceof IMonitorable && showProgress){
+ long size=ftc.getSourceFileSize();
+ if(isRange()){
+ size=getRangeSize();
+ }
+// p=new ProgressBar(localFile.getName(),size,msg);
+// ((IMonitorable) ftc).setProgressListener(p);
+ }
+ long startTime=System.currentTimeMillis();
+ if(isRange()){
+ if(!(ftc instanceof SupportsPartialRead)){
+ throw new Exception("Byte range is defined but protocol does not allow " +
+ "partial read! Please choose a different protocol!");
+ }
+ System.out.println("Byte range: "+startByte+" - "+(getRangeSize()>0?endByte:""));
+ SupportsPartialRead pReader=(SupportsPartialRead)ftc;
+ pReader.readPartial(startByte, endByte-startByte+1, os);
+ }
+ else{
+ ftc.readAllData(os);
+ }
+// if(p!=null){
+// p.finish();
+// }
+ if(timing){
+ long duration=System.currentTimeMillis()-startTime;
+ double rate=(double)localFile.length()/(double)duration;
+ System.out.println("Rate: " +rate+ " kB/sec.");
+ }
+ if(targetStream==null)copyProperties(source, localFile);
+ }
+ finally{
+ try{
+ if(targetStream==null && os!=null){
+ os.close();
+ }
+ }catch(Exception ignored){}
+ if(ftc!=null){
+ try{
+ ftc.destroy();
+ }catch(Exception e1){
+// System.out.println("Could not destroy the filetransfer client",e1);
+ }
+ }
+ }
+ }
+
+ /**
+ * if possible, copy the remote executable flag to the local file
+ * @throws Exception
+ */
+ private void copyProperties(GridFileType source, File localFile)throws Exception{
+ try{
+ localFile.setExecutable(source.getPermissions().getExecutable());
+ }
+ catch(Exception ex){
+ //TODO: logging
+// ("Can't set 'executable' flag for "+localFile.getName(), ex);
+ }
+ }
+
+ private void configure(FileTransferClient ftc, Map<String,String>params){
+ if(ftc instanceof UFTPFileTransferClient){
+ UFTPFileTransferClient u=(UFTPFileTransferClient)ftc;
+ String secret=params.get(UFTPConstants.PARAM_SECRET);
+ u.setSecret(secret);
+ }
+ }
+
+ public void setShowProgress(boolean showProgress) {
+ this.showProgress = showProgress;
+ }
+
+ public void setForceFileOnly(boolean forceFileOnly) {
+ this.forceFileOnly = forceFileOnly;
+ }
+
+ public void setTargetStream(OutputStream targetStream) {
+ this.targetStream = targetStream;
+ }
+
+}
Added: airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/FileTransferBase.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/FileTransferBase.java?rev=1550762&view=auto
==============================================================================
--- airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/FileTransferBase.java (added)
+++ airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/FileTransferBase.java Fri Dec 13 16:22:01 2013
@@ -0,0 +1,206 @@
+package org.apache.airavata.gfac.provider.utils;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import org.unigrids.services.atomic.types.GridFileType;
+import org.unigrids.services.atomic.types.ProtocolType;
+
+import de.fzj.unicore.uas.client.StorageClient;
+import de.fzj.unicore.uas.util.PropertyHelper;
+public class FileTransferBase {
+
+ protected Properties extraParameterSource;
+
+ protected boolean timing=false;
+
+ protected boolean recurse=false;
+
+ protected String from;
+
+ protected String to;
+
+ //index of first byte to download
+ protected Long startByte;
+
+ //index of last byte to download
+ protected Long endByte;
+
+ /**
+ * the creation mode
+ */
+ protected Mode mode;
+
+ /**
+ * whether the job processing should fail if an error occurs
+ */
+ protected boolean failOnError;
+
+ protected List<ProtocolType.Enum> preferredProtocols=new ArrayList<ProtocolType.Enum>();
+
+ public FileTransferBase(){
+ preferredProtocols.add(ProtocolType.BFT);
+ }
+
+ protected Map<String,String>makeExtraParameters(ProtocolType.Enum protocol){
+ Map<String, String> res;
+ if(extraParameterSource==null){
+ res=new HashMap<String, String>();
+ }
+ else{
+ String p=String.valueOf(protocol);
+ PropertyHelper ph=new PropertyHelper(extraParameterSource, new String[]{p,p.toLowerCase()});
+ res= ph.getFilteredMap();
+ }
+ if(res.size()>0){
+ // TODO: change it to logger
+ System.out.println("Have "+res.size()+" extra parameters for protocol "+protocol);
+ }
+ return res;
+ }
+
+
+ public String getTo() {
+ return to;
+ }
+
+ public String getFrom() {
+ return from;
+ }
+
+ public void setTo(String to) {
+ this.to = to;
+ }
+
+ public void setFrom(String from) {
+ this.from = from;
+ }
+
+ public Mode getMode() {
+ return mode;
+ }
+
+ public boolean isFailOnError() {
+ return failOnError;
+ }
+
+ public boolean isTiming() {
+ return timing;
+ }
+
+ public void setTiming(boolean timing) {
+ this.timing = timing;
+ }
+
+ public void setFailOnError(boolean failOnError) {
+ this.failOnError = failOnError;
+ }
+
+ public List<ProtocolType.Enum> getPreferredProtocols() {
+ return preferredProtocols;
+ }
+
+ public void setPreferredProtocols(List<ProtocolType.Enum> preferredProtocols) {
+ this.preferredProtocols = preferredProtocols;
+ }
+
+ public void setExtraParameterSource(Properties properties){
+ this.extraParameterSource=properties;
+ }
+
+ public void setRecurse(boolean recurse) {
+ this.recurse = recurse;
+ }
+ /**
+ * check if the given path denotes a valid remote directory
+ * @param remotePath - the path
+ * @param sms - the storage
+ * @return <code>true</code> if the remote directory exists and is a directory
+ */
+ protected boolean isValidDirectory(String remotePath, StorageClient sms){
+ boolean result=false;
+ if(! ("/".equals(remotePath) || ".".equals(remotePath)) ){
+ try{
+ GridFileType gft=sms.listProperties(remotePath);
+ result=gft.getIsDirectory();
+ }catch(Exception ex){
+ result=false;
+ }
+ }
+ else result=true;
+
+ return result;
+ }
+
+ public File[] resolveWildCards(File original){
+ final String name=original.getName();
+ if(!hasWildCards(original))return new File[]{original};
+ File parent=original.getParentFile();
+ if(parent==null)parent=new File(".");
+ FilenameFilter filter=new FilenameFilter(){
+ Pattern p=createPattern(name);
+ public boolean accept(File file, String name){
+ return p.matcher(name).matches();
+ }
+ };
+ return parent.listFiles(filter);
+ }
+
+ protected boolean hasWildCards(File file){
+ return hasWildCards(file.getName());
+ }
+
+ public boolean hasWildCards(String name){
+ return name.contains("*") || name.contains("?");
+ }
+
+ private Pattern createPattern(String nameWithWildcards){
+ String regex=nameWithWildcards.replace("?",".").replace("*", ".*");
+ return Pattern.compile(regex);
+ }
+
+ protected ProtocolType.Enum chosenProtocol=null;
+
+ public ProtocolType.Enum getChosenProtocol(){
+ return chosenProtocol;
+ }
+
+ public Long getStartByte() {
+ return startByte;
+ }
+
+ public void setStartByte(Long startByte) {
+ this.startByte = startByte;
+ }
+
+ public Long getEndByte() {
+ return endByte;
+ }
+
+ public void setEndByte(Long endByte) {
+ this.endByte = endByte;
+ }
+
+ /**
+ * checks if a byte range is defined
+ * @return <code>true</code> iff both startByte and endByte are defined
+ */
+ protected boolean isRange(){
+ return startByte!=null && endByte!=null;
+ }
+
+ /**
+ * get the number of bytes in the byte range, or "-1" if the range is open-ended
+ * @return
+ */
+ protected long getRangeSize(){
+ if(Long.MAX_VALUE==endByte)return -1;
+ return endByte-startByte;
+ }
+}
Added: airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/FileUploader.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/FileUploader.java?rev=1550762&view=auto
==============================================================================
--- airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/FileUploader.java (added)
+++ airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/FileUploader.java Fri Dec 13 16:22:01 2013
@@ -0,0 +1,224 @@
+package org.apache.airavata.gfac.provider.utils;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+
+import org.unigrids.services.atomic.types.ProtocolType;
+
+import de.fzj.unicore.uas.client.FileTransferClient;
+import de.fzj.unicore.uas.client.StorageClient;
+import de.fzj.unicore.uas.client.UFTPConstants;
+import de.fzj.unicore.uas.client.UFTPFileTransferClient;
+import de.fzj.unicore.uas.fts.FiletransferOptions.IMonitorable;
+
+/**
+ * upload local file(s) to a remote location
+ *
+ * @author schuller
+ */
+public class FileUploader extends FileTransferBase{
+
+ public FileUploader(String from, String to, Mode mode)throws FileNotFoundException{
+ this(from,to,mode,true);
+ }
+
+ public FileUploader(String from, String to, Mode mode, boolean failOnError)throws FileNotFoundException{
+ this.to=to;
+ this.from=from;
+ this.mode=mode;
+ this.failOnError=failOnError;
+ checkOK();
+ }
+
+ public String getFrom() {
+ return from;
+ }
+
+ public String getTo() {
+ return to;
+ }
+
+
+ public void perform(StorageClient sms)throws Exception{
+ File fileSpec=new File(from);
+ boolean hasWildCards=false;
+ boolean isDirectory=fileSpec.isDirectory();
+ File[] fileset=null;
+
+ if(!isDirectory){
+ hasWildCards=hasWildCards(fileSpec);
+ }
+
+ chosenProtocol=sms.findSupportedProtocol(preferredProtocols.toArray(new ProtocolType.Enum[preferredProtocols.size()]));
+ Map<String,String>extraParameters=makeExtraParameters(chosenProtocol);
+
+ if(!hasWildCards && !isDirectory){
+ //single regular file
+ uploadFile(fileSpec,to,sms,chosenProtocol,extraParameters);
+ return;
+ }
+
+ //handle wildcards or directory
+ if(hasWildCards){
+ fileset=resolveWildCards(fileSpec);
+ }
+ else{
+ fileset=fileSpec.listFiles();
+ }
+
+ if(!isValidDirectory(to, sms)){
+ throw new IOException("The specified remote target '"+to+"' is not a directory");
+ }
+ if(to==null)to="/";
+ String target=isDirectory?to+"/"+fileSpec.getName():to;
+ sms.createDirectory(target);
+ uploadFiles(fileset,target,sms,chosenProtocol,extraParameters);
+ }
+
+ /**
+ * upload a set of files to a remote directory (which must exist)
+ *
+ * @param files
+ * @param remoteDirectory
+ * @param sms
+ * @param protocol
+ * @param extraParameters
+ * @param msg
+ * @throws Exception
+ */
+ private void uploadFiles(File[]files, String remoteDirectory, StorageClient sms, ProtocolType.Enum protocol,
+ Map<String,String>extraParameters)throws Exception{
+ for(File localFile: files){
+ String target=remoteDirectory+"/"+localFile.getName();
+ if(localFile.isDirectory()){
+ if(!recurse){
+ System.out.println("Skipping directory "+localFile.getAbsolutePath());
+ }else{
+ File[] fileset=localFile.listFiles();
+ sms.createDirectory(target);
+ uploadFiles(fileset,target,sms,protocol,extraParameters);
+ }
+ }else{
+ uploadFile(localFile,target,sms,protocol,extraParameters);
+ }
+ }
+ }
+
+ /**
+ * uploads a single regular file
+ *
+ * @param localFile
+ * @param remotePath
+ * @param sms
+ * @param protocol
+ * @param extraParameters
+ * @param msg
+ * @throws Exception
+ */
+ private void uploadFile(File localFile, String remotePath, StorageClient sms, ProtocolType.Enum protocol,
+ Map<String,String>extraParameters) throws Exception{
+ long startTime=System.currentTimeMillis();
+ FileInputStream is=null;
+ FileTransferClient ftc=null;
+ try{
+ if(remotePath==null){
+ remotePath="/"+localFile.getName();
+ }
+ else if(remotePath.endsWith("/")){
+ remotePath+=localFile.getName();
+ }
+ System.out.println("Uploading local file '"+localFile.getAbsolutePath()+"' -> '"+sms.getUrl()+"#"+remotePath+"'");
+ is=new FileInputStream(localFile.getAbsolutePath());
+ boolean append=Mode.append.equals(mode);
+ ftc=sms.getImport(remotePath, append, extraParameters, protocol);
+ configure(ftc, extraParameters);
+ if(append)ftc.setAppend(true);
+ String url=ftc.getUrl();
+ System.out.println("File transfer URL : "+url);
+// ProgressBar p=null;
+ if(ftc instanceof IMonitorable){
+ long size=localFile.length();
+ if(isRange()){
+ size=getRangeSize();
+ }
+// p=new ProgressBar(localFile.getName(),size,msg);
+// ((IMonitorable) ftc).setProgressListener(p);
+ }
+ if(isRange()){
+ System.out.println("Byte range: "+startByte+" - "+(getRangeSize()>0?endByte:""));
+ long skipped=0;
+ while(skipped<startByte){
+ skipped+=is.skip(startByte);
+ }
+ ftc.writeAllData(is, endByte-startByte+1);
+
+ }else{
+ ftc.writeAllData(is);
+ }
+ copyProperties(localFile, sms, remotePath);
+
+// if(ftc instanceof IMonitorable){
+// p.finish();
+// }
+
+ }finally{
+ if(ftc!=null){
+ try{
+ ftc.destroy();
+ }catch(Exception e1){
+// msg.error("Could not clean-up the filetransfer at <"+ftc.getUrl()+">",e1);
+ }
+ }
+ try{ if(is!=null)is.close(); }catch(Exception ignored){}
+ }
+ if(timing){
+ long duration=System.currentTimeMillis()-startTime;
+ double rate=(double)localFile.length()/(double)duration;
+ System.out.println("Rate: "+rate+ " kB/sec.");
+ }
+ }
+
+ /**
+ * if possible, copy the local executable flag to the remote file
+ * @param sourceFile - local file
+ * @throws Exception
+ */
+ private void copyProperties(File sourceFile, StorageClient sms, String target)throws Exception{
+ boolean x=sourceFile.canExecute();
+ try{
+ if(x){
+ sms.changePermissions(target, true, true, x);
+ }
+ }catch(Exception ex){
+// System.out.println("Can't set exectuable flag on remote file.",ex);
+ }
+ }
+
+ private void checkOK()throws FileNotFoundException{
+ if(!failOnError){
+ return;
+ }
+ File orig=new File(from);
+ if(!orig.isAbsolute()){
+ orig=new File(System.getProperty("user.dir"),from);
+ }
+ File[] files=resolveWildCards(orig);
+ if(files==null){
+ throw new FileNotFoundException("Local import '"+from+"' does not exist.");
+ }
+ for(File f: files){
+ if(!f.exists())throw new FileNotFoundException("Local import '"+from+"' does not exist.");
+ }
+ }
+
+ private void configure(FileTransferClient ftc, Map<String,String>params){
+ if(ftc instanceof UFTPFileTransferClient){
+ UFTPFileTransferClient u=(UFTPFileTransferClient)ftc;
+ String secret=params.get(UFTPConstants.PARAM_SECRET);
+ u.setSecret(secret);
+ }
+ }
+}
Added: airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/Mode.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/Mode.java?rev=1550762&view=auto
==============================================================================
--- airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/Mode.java (added)
+++ airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/Mode.java Fri Dec 13 16:22:01 2013
@@ -0,0 +1,24 @@
+package org.apache.airavata.gfac.provider.utils;
+
+/**
+ * file creation modes
+ */
+public enum Mode {
+
+ /**
+ * overwrite any existing file
+ */
+ overwrite,
+
+ /**
+ * append to an existing file
+ */
+ append,
+
+ /**
+ * do NOT overwrite and fail if the file exists
+ */
+ nooverwrite
+
+
+}
\ No newline at end of file
Added: airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/StorageCreator.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/StorageCreator.java?rev=1550762&view=auto
==============================================================================
--- airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/StorageCreator.java (added)
+++ airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/StorageCreator.java Fri Dec 13 16:22:01 2013
@@ -0,0 +1,190 @@
+package org.apache.airavata.gfac.provider.utils;
+
+import java.util.Calendar;
+
+import javax.security.auth.x500.X500Principal;
+
+import org.oasisOpen.docs.wsrf.sg2.EntryType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.unigrids.services.atomic.types.PropertyType;
+import org.unigrids.x2006.x04.services.smf.CreateSMSDocument;
+import org.unigrids.x2006.x04.services.smf.StorageBackendParametersDocument.StorageBackendParameters;
+import org.unigrids.x2006.x04.services.smf.StorageDescriptionType;
+import org.w3.x2005.x08.addressing.EndpointReferenceType;
+
+import de.fzj.unicore.uas.StorageFactory;
+import de.fzj.unicore.uas.client.StorageClient;
+import de.fzj.unicore.uas.client.StorageFactoryClient;
+import de.fzj.unicore.wsrflite.xmlbeans.WSUtilities;
+import de.fzj.unicore.wsrflite.xmlbeans.client.RegistryClient;
+import de.fzj.unicore.wsrflite.xmlbeans.sg.Registry;
+
+
+import eu.unicore.util.httpclient.DefaultClientConfiguration;
+
+public class StorageCreator {
+ protected final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ /**
+ * the initial lifetime (in days) for newly created SMSs
+ */
+ private int initialLifeTime;
+
+ /**
+ * factory URL to use
+ */
+ private String factoryUrl;
+
+ /**
+ * site where to create the storage
+ */
+ private String siteName;
+
+ /**
+ * storage type to create
+ */
+ private String storageType;
+
+ private DefaultClientConfiguration secProps;
+
+ private String userName;
+
+ public StorageCreator(DefaultClientConfiguration secProps, String besUrl, int initialLifetime, String storageType, String userName) {
+ this.secProps = secProps;
+ this.factoryUrl = getStorageFactoryUrl(besUrl);
+ this.storageType = storageType;
+ this.initialLifeTime = initialLifetime;
+ this.userName = userName;
+ }
+
+
+ public StorageCreator(DefaultClientConfiguration secProps, String besUrl, int initialLifetime, String userName) {
+ this.secProps = secProps;
+ this.factoryUrl = getStorageFactoryUrl(besUrl);
+ this.initialLifeTime = initialLifetime;
+ this.userName = userName;
+ }
+
+
+ // The target site must have storage factory deployed with bes factory
+ public StorageClient createStorage() throws Exception{
+
+ if(factoryUrl == null) {
+ throw new Exception("Cannot create Storage Factory Url");
+ }
+
+ EndpointReferenceType sfEpr= WSUtilities.makeServiceEPR(factoryUrl, StorageFactory.SMF_PORT);
+
+ String dn = findServerName(factoryUrl, sfEpr);
+
+ WSUtilities.addServerIdentity(sfEpr, dn);
+
+ secProps.getETDSettings().setReceiver(new X500Principal(dn));
+ secProps.getETDSettings().setIssuerCertificateChain(secProps.getCredential().getCertificateChain());
+
+ // TODO: remove it afterwards
+ if(userName != null) {
+ secProps.getETDSettings().getRequestedUserAttributes2().put("xlogin", new String[]{userName});
+ }
+
+ StorageFactoryClient sfc = new StorageFactoryClient(sfEpr, secProps);
+
+ if (log.isDebugEnabled()){
+ log.debug("Using storage factory at <"+sfc.getUrl()+">");
+ }
+
+ StorageClient sc = null;
+ try{
+ sc=sfc.createSMS(getCreateSMSDocument());
+
+ String addr=sc.getEPR().getAddress().getStringValue();
+ log.info(addr);
+
+ }catch(Exception ex){
+ log.error("Could not create storage",ex);
+ throw new Exception(ex);
+ }
+
+ return sc;
+ }
+
+ protected String findServerName(String besUrl, EndpointReferenceType smsEpr)throws Exception{
+
+ int besIndex = besUrl.indexOf("StorageFactory?res");
+ String ss = besUrl.substring(0, besIndex);
+ ss = ss + "Registry";
+
+ EndpointReferenceType eprt = WSUtilities.makeServiceEPR(ss, "default_registry", Registry.REGISTRY_PORT);
+
+ RegistryClient registry = new RegistryClient(eprt, secProps);
+
+ //first, check if server name is already in the EPR...
+ String dn=WSUtilities.extractServerIDFromEPR(smsEpr);
+ if(dn!=null){
+ return dn;
+ }
+ //otherwise find a matching service in the registry
+ String url=smsEpr.getAddress().getStringValue();
+ if(url.contains("/services/"))url=url.substring(0,url.indexOf("/services"));
+ if(log.isDebugEnabled()) log.debug("Checking for services at "+url);
+ for(EntryType entry:registry.listEntries()){
+ if(entry.getMemberServiceEPR().getAddress().getStringValue().startsWith(url)){
+ dn=WSUtilities.extractServerIDFromEPR(entry.getMemberServiceEPR());
+ if(dn!=null){
+ return dn;
+ }
+ }
+ }
+ return null;
+ }
+
+
+ public static String getStorageFactoryUrl(String besUrl){
+ int besIndex = besUrl.indexOf("BESFactory?res");
+ String ss = besUrl.substring(0, besIndex);
+ ss = ss + "StorageFactory?res=default_storage_factory";
+ return ss;
+ }
+
+ /**
+ * prepare request
+ * */
+ protected CreateSMSDocument getCreateSMSDocument(String ...keyValueParams){
+ CreateSMSDocument in=CreateSMSDocument.Factory.newInstance();
+ in.addNewCreateSMS();
+ if(initialLifeTime>0){
+ in.getCreateSMS().addNewTerminationTime().setCalendarValue(getTermTime());
+ }
+ if(storageType!=null){
+ if(log.isDebugEnabled()) {
+ log.debug("Will create storage of type : "+storageType);
+ }
+ StorageDescriptionType desc=in.getCreateSMS().addNewStorageDescription();
+ desc.setStorageBackendType(storageType);
+ if(keyValueParams.length>1){
+ //other parameters from the cmdline as key=value
+ StorageBackendParameters params=desc.addNewStorageBackendParameters();
+ for(int i=1;i<keyValueParams.length;i++){
+ String arg=keyValueParams[i];
+ String[]sp=arg.split("=",2);
+ PropertyType prop=params.addNewProperty();
+ prop.setName(sp[0]);
+ prop.setValue(sp[1]);
+ if(log.isDebugEnabled()) {
+ log.debug("Have parameter : "+arg);
+ }
+ }
+ }
+ }
+ return in;
+ }
+
+ protected Calendar getTermTime(){
+ Calendar c = Calendar.getInstance();
+ c.add(Calendar.DATE, initialLifeTime);
+ return c;
+ }
+
+
+}
Added: airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/UASDataStagingProcessor.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/UASDataStagingProcessor.java?rev=1550762&view=auto
==============================================================================
--- airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/UASDataStagingProcessor.java (added)
+++ airavata/trunk/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/utils/UASDataStagingProcessor.java Fri Dec 13 16:22:01 2013
@@ -0,0 +1,204 @@
+package org.apache.airavata.gfac.provider.utils;
+
+import java.io.File;
+import java.util.Map;
+
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
+import org.apache.airavata.schemas.gfac.StringArrayType;
+import org.apache.airavata.schemas.gfac.StringParameterType;
+import org.apache.airavata.schemas.gfac.URIArrayType;
+import org.apache.airavata.schemas.gfac.URIParameterType;
+import org.apache.airavata.schemas.gfac.UnicoreHostType;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
+
+public class UASDataStagingProcessor {
+
+ public static void generateDataStagingElements(JobDefinitionType value, JobExecutionContext context, String smsUrl) throws Exception{
+
+ HpcApplicationDeploymentType appDepType = (HpcApplicationDeploymentType) context
+ .getApplicationContext().getApplicationDeploymentDescription()
+ .getType();
+
+ smsUrl = "BFT:"+smsUrl;
+
+ if (context.getInMessageContext().getParameters().size() > 0) {
+ buildDataStagingFromInputContext(context, value, smsUrl, appDepType);
+ }
+ MessageContext outMessage = new MessageContext();
+ ActualParameter a1 = new ActualParameter();
+ a1.getType().changeType(StringParameterType.type);
+ ((StringParameterType)a1.getType()).setValue("output/analysis-results.tar");
+ outMessage.addParameter("o1", a1);
+ context.setOutMessageContext(outMessage);
+
+ if (context.getOutMessageContext().getParameters().size() > 0) {
+ buildFromOutputContext(context, value, smsUrl, appDepType);
+ }
+ createStdOutURIs(value, appDepType, smsUrl, isUnicoreEndpoint(context));
+ }
+
+ private static void createInURISMSElement(JobDefinitionType value,
+ String smsUrl, String inputDir, ActualParameter inParam)
+ throws Exception {
+
+ String uri = ((URIParameterType) inParam.getType()).getValue();
+ //TODO: To add this input file name setting part of Airavata API
+ String fileName = "input/" + new File(uri).getName();
+ if (uri.startsWith("file")) {
+ String fileUri = smsUrl+"#/"+fileName;
+
+ JSDLUtils.addDataStagingSourceElement(value, fileUri, null, fileName);
+ } else if (uri.startsWith("gsiftp") || uri.startsWith("http")
+ || uri.startsWith("rns")) {
+ // no need to stage-in those files to the input
+ // directory because unicore site will fetch them for the user
+ JSDLUtils.addDataStagingSourceElement(value, uri, null, fileName);
+ }
+
+ }
+
+ private static void createStdOutURIs(JobDefinitionType value,
+ HpcApplicationDeploymentType appDepType, String smsUrl,
+ boolean isUnicore) throws Exception {
+
+
+ String stdout = ApplicationProcessor.getApplicationStdOut(value, appDepType);
+
+ String stderr = ApplicationProcessor.getApplicationStdErr(value, appDepType);
+
+ String stdoutFileName = (stdout == null || stdout.equals("")) ? "stdout"
+ : stdout;
+ String stdoutURI = smsUrl+"#/output/"+stdoutFileName;
+ JSDLUtils.addDataStagingTargetElement(value, null, stdoutFileName,
+ stdoutURI);
+
+ String stderrFileName = (stdout == null || stderr.equals("")) ? "stderr"
+ : stderr;
+ String stderrURI = smsUrl+"#/output/"+stderrFileName;
+ JSDLUtils.addDataStagingTargetElement(value, null, stderrFileName,
+ stderrURI);
+
+ if(isUnicore) {
+ String scriptExitCodeFName = "UNICORE_SCRIPT_EXIT_CODE";
+ String scriptExitCode = smsUrl+"#/output/"+scriptExitCodeFName;
+ JSDLUtils.addDataStagingTargetElement(value, null,
+ scriptExitCodeFName, scriptExitCode.toString());
+ }
+
+ }
+
+
+ private static void createOutStringElements(JobDefinitionType value,
+ HpcApplicationDeploymentType appDeptype, String smsUrl, String prmValue) throws Exception {
+
+ if(prmValue == null || "".equals(prmValue)) return;
+
+ String finalSMSPath = smsUrl + "#/output/"+prmValue;
+
+ JSDLUtils.addDataStagingTargetElement(value, null, prmValue, finalSMSPath);
+ }
+
+
+ private static void createOutURIElement(JobDefinitionType value,
+ String prmValue) throws Exception {
+ String fileName = new File(prmValue.toString()).getName();
+ JSDLUtils.addDataStagingTargetElement(value, null, fileName, prmValue);
+ }
+
+
+ private static JobDefinitionType buildFromOutputContext(JobExecutionContext context,
+ JobDefinitionType value, String smsUrl,
+ HpcApplicationDeploymentType appDepType) throws Exception {
+
+ Map<String, Object> outputParams = context.getOutMessageContext()
+ .getParameters();
+
+ for (String paramKey : outputParams.keySet()) {
+
+ ActualParameter outParam = (ActualParameter) outputParams
+ .get(paramKey);
+
+ // if single urls then convert each url into jsdl source
+ // elements,
+ // that are formed by concat of gridftpurl+inputdir+filename
+
+ String paramDataType = outParam.getType().getType().toString();
+
+ if ("URI".equals(paramDataType)) {
+ String uriPrm = ((URIParameterType) outParam.getType())
+ .getValue();
+ createOutURIElement(value, uriPrm);
+ }
+
+ // string params are converted into the job arguments
+
+ else if (("URIArray").equals(paramDataType)) {
+ String[] uriArray = ((URIArrayType) outParam.getType())
+ .getValueArray();
+ for (String u : uriArray) {
+
+ createOutURIElement(value, u);
+ }
+
+ }
+ else if ("String".equals(paramDataType)) {
+ String stringPrm = ((StringParameterType) outParam
+ .getType()).getValue();
+ createOutStringElements(value, appDepType, smsUrl, stringPrm);
+ }
+
+ else if ("StringArray".equals(paramDataType)) {
+ String[] valueArray = ((StringArrayType) outParam.getType())
+ .getValueArray();
+ for (String v : valueArray) {
+ createOutStringElements(value, appDepType, smsUrl, v);
+ }
+ }
+ }
+
+ return value;
+ }
+
+
+ private static void buildDataStagingFromInputContext(JobExecutionContext context, JobDefinitionType value, String smsUrl, HpcApplicationDeploymentType appDepType)
+ throws Exception {
+
+ // TODO set data directory
+ Map<String, Object> inputParams = context.getInMessageContext()
+ .getParameters();
+
+ for (String paramKey : inputParams.keySet()) {
+
+ ActualParameter inParam = (ActualParameter) inputParams
+ .get(paramKey);
+
+ // if single urls then convert each url into jsdl source
+ // elements,
+ // that are formed by concat of gridftpurl+inputdir+filename
+
+ String paramDataType = inParam.getType().getType().toString();
+
+ if ("URI".equals(paramDataType)) {
+ createInURISMSElement(value, smsUrl,
+ appDepType.getInputDataDirectory(), inParam);
+ }
+
+ // string params are converted into the job arguments
+
+ else if ("String".equals(paramDataType)) {
+ String stringPrm = ((StringParameterType) inParam.getType())
+ .getValue();
+ ApplicationProcessor.addApplicationArgument(value, appDepType, stringPrm);
+ }
+ }
+
+ }
+
+ public static boolean isUnicoreEndpoint(JobExecutionContext context) {
+ return ( (context.getApplicationContext().getHostDescription().getType() instanceof UnicoreHostType)?true:false );
+ }
+
+}