You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ra...@apache.org on 2014/08/28 17:45:19 UTC

[3/4] git commit: Allow usee to send the file URL to move the files. AIRAVATA-1419

Allow usee to send the file URL to move the files. AIRAVATA-1419

Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6c500f2c
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6c500f2c
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6c500f2c

Branch: refs/heads/master
Commit: 6c500f2cc923c58600a4d1094e119094c618cf2c
Parents: 6c4471a
Author: raminder <ra...@apache.org>
Authored: Thu Aug 28 11:17:06 2014 -0400
Committer: raminder <ra...@apache.org>
Committed: Thu Aug 28 11:17:06 2014 -0400

----------------------------------------------------------------------
 .../ssh/handler/AdvancedSCPInputHandler.java    | 123 ++++++++++++-------
 .../ssh/handler/AdvancedSCPOutputHandler.java   |  14 +++
 2 files changed, 90 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/6c500f2c/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
index 86dcb22..7e3ecbb 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
@@ -52,6 +52,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
 import java.util.*;
 
 /**
@@ -133,11 +135,6 @@ public class AdvancedSCPInputHandler extends AbstractRecoverableHandler {
                         this.passPhrase);
             }
             // Server info
-            ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
-            Cluster pbsCluster = null;
-            // here doesn't matter what the job manager is because we are only doing some file handling
-            // not really dealing with monitoring or job submission, so we pa
-            pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
             String parentPath = inputPath + File.separator + jobExecutionContext.getExperimentID() + File.separator + jobExecutionContext.getTaskData().getTaskID();
             if (index < oldIndex) {
                 parentPath = oldFiles.get(index);
@@ -149,48 +146,80 @@ public class AdvancedSCPInputHandler extends AbstractRecoverableHandler {
             }
             DataTransferDetails detail = new DataTransferDetails();
             TransferStatus status = new TransferStatus();
-          
-            MessageContext input = jobExecutionContext.getInMessageContext();
-            Set<String> parameters = input.getParameters().keySet();
-            for (String paramName : parameters) {
-                ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName);
-                String paramValue = MappingFactory.toString(actualParameter);
-                //TODO: Review this with type
-                if ("URI".equals(actualParameter.getType().getType().toString())) {
-                    if (index < oldIndex) {
-                        log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
-                        ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index));
-                        data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
-                    } else {
-                        String stageInputFile = stageInputFiles(pbsCluster, paramValue, parentPath);
-                        ((URIParameterType) actualParameter.getType()).setValue(stageInputFile);
-                        StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString());
-                        status.setTransferState(TransferState.UPLOAD);
-                        detail.setTransferStatus(status);
-                        detail.setTransferDescription("Input Data Staged: " + stageInputFile);
-                        registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-                
-                        GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
-                    }
-                } else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
-                    List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
-                    List<String> newFiles = new ArrayList<String>();
-                    for (String paramValueEach : split) {
-                        if (index < oldIndex) {
-                            log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
-                            newFiles.add(oldFiles.get(index));
-                            data.append(oldFiles.get(index++)).append(",");
-                        } else {
-                            String stageInputFiles = stageInputFiles(pbsCluster, paramValueEach, parentPath);
-                            StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString());
-                            GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
-                            newFiles.add(stageInputFiles);
-                        }
-                    }
-                    ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
-                }
-                inputNew.getParameters().put(paramName, actualParameter);
-            }
+            Cluster pbsCluster = null;
+            // here doesn't matter what the job manager is because we are only doing some file handling
+            // not really dealing with monitoring or job submission, so we pa
+            String lastHost = null;
+            
+			MessageContext input = jobExecutionContext.getInMessageContext();
+			Set<String> parameters = input.getParameters().keySet();
+			for (String paramName : parameters) {
+				ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName);
+				String paramValue = MappingFactory.toString(actualParameter);
+				// TODO: Review this with type
+				if ("URI".equals(actualParameter.getType().getType().toString())) {
+					try {
+						URL file = new URL(paramValue);
+						this.userName = file.getUserInfo();
+						this.hostName = file.getHost();
+						paramValue = file.getPath();
+					} catch (MalformedURLException e) {
+						log.error(e.getLocalizedMessage(),e);
+					}
+					ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
+					if (pbsCluster == null && (lastHost == null || !lastHost.equals(hostName))) {
+						pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
+					}
+					lastHost = hostName;
+
+					if (index < oldIndex) {
+						log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
+						((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index));
+						data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
+					} else {
+						String stageInputFile = stageInputFiles(pbsCluster, paramValue, parentPath);
+						((URIParameterType) actualParameter.getType()).setValue(stageInputFile);
+						StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString());
+						status.setTransferState(TransferState.UPLOAD);
+						detail.setTransferStatus(status);
+						detail.setTransferDescription("Input Data Staged: " + stageInputFile);
+						registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+
+						GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+					}
+				} else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+					List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
+					List<String> newFiles = new ArrayList<String>();
+					for (String paramValueEach : split) {
+						try {
+							URL file = new URL(paramValue);
+							this.userName = file.getUserInfo();
+							this.hostName = file.getHost();
+							paramValueEach = file.getPath();
+						} catch (MalformedURLException e) {
+							log.error(e.getLocalizedMessage(),e);
+						}
+						ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
+						if (pbsCluster == null && (lastHost == null || !lastHost.equals(hostName))) {
+							pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
+						}
+						lastHost = hostName;
+
+						if (index < oldIndex) {
+							log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
+							newFiles.add(oldFiles.get(index));
+							data.append(oldFiles.get(index++)).append(",");
+						} else {
+							String stageInputFiles = stageInputFiles(pbsCluster, paramValueEach, parentPath);
+							StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString());
+							GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+							newFiles.add(stageInputFiles);
+						}
+					}
+					((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
+				}
+				inputNew.getParameters().put(paramName, actualParameter);
+			}
         } catch (Exception e) {
             log.error(e.getMessage());
             throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());

http://git-wip-us.apache.org/repos/asf/airavata/blob/6c500f2c/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
index 9b9f7b2..116d769 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
@@ -45,6 +45,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -117,12 +119,24 @@ public class AdvancedSCPOutputHandler extends AbstractHandler {
                         this.passPhrase);
             }
             // Server info
+            if(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().getOutputDataDir() != null){
+            	try{
+            	URL outputPathURL = new URL(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().getOutputDataDir());
+            	this.userName = outputPathURL.getUserInfo();
+            	this.hostName = outputPathURL.getHost();
+            	outputPath = outputPathURL.getPath();
+            	} catch (MalformedURLException e) {
+					log.error(e.getLocalizedMessage(),e);
+				}
+            }
             ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
 
             Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
+            if(!jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().isPersistOutputData()){
             outputPath = outputPath + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID()
                     + File.separator;
             pbsCluster.makeDirectory(outputPath);
+            }
             pbsCluster.scpTo(outputPath, standardError);
             pbsCluster.scpTo(outputPath, standardOutput);
             List<DataObjectType> outputArray = new ArrayList<DataObjectType>();