You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ra...@apache.org on 2014/08/28 17:45:19 UTC
[3/4] git commit: Allow usee to send the file URL to move the files.
AIRAVATA-1419
Allow usee to send the file URL to move the files. AIRAVATA-1419
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6c500f2c
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6c500f2c
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6c500f2c
Branch: refs/heads/master
Commit: 6c500f2cc923c58600a4d1094e119094c618cf2c
Parents: 6c4471a
Author: raminder <ra...@apache.org>
Authored: Thu Aug 28 11:17:06 2014 -0400
Committer: raminder <ra...@apache.org>
Committed: Thu Aug 28 11:17:06 2014 -0400
----------------------------------------------------------------------
.../ssh/handler/AdvancedSCPInputHandler.java | 123 ++++++++++++-------
.../ssh/handler/AdvancedSCPOutputHandler.java | 14 +++
2 files changed, 90 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/6c500f2c/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
index 86dcb22..7e3ecbb 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
@@ -52,6 +52,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.util.*;
/**
@@ -133,11 +135,6 @@ public class AdvancedSCPInputHandler extends AbstractRecoverableHandler {
this.passPhrase);
}
// Server info
- ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
- Cluster pbsCluster = null;
- // here doesn't matter what the job manager is because we are only doing some file handling
- // not really dealing with monitoring or job submission, so we pa
- pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
String parentPath = inputPath + File.separator + jobExecutionContext.getExperimentID() + File.separator + jobExecutionContext.getTaskData().getTaskID();
if (index < oldIndex) {
parentPath = oldFiles.get(index);
@@ -149,48 +146,80 @@ public class AdvancedSCPInputHandler extends AbstractRecoverableHandler {
}
DataTransferDetails detail = new DataTransferDetails();
TransferStatus status = new TransferStatus();
-
- MessageContext input = jobExecutionContext.getInMessageContext();
- Set<String> parameters = input.getParameters().keySet();
- for (String paramName : parameters) {
- ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName);
- String paramValue = MappingFactory.toString(actualParameter);
- //TODO: Review this with type
- if ("URI".equals(actualParameter.getType().getType().toString())) {
- if (index < oldIndex) {
- log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
- ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index));
- data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
- } else {
- String stageInputFile = stageInputFiles(pbsCluster, paramValue, parentPath);
- ((URIParameterType) actualParameter.getType()).setValue(stageInputFile);
- StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString());
- status.setTransferState(TransferState.UPLOAD);
- detail.setTransferStatus(status);
- detail.setTransferDescription("Input Data Staged: " + stageInputFile);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
-
- GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
- }
- } else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
- List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
- List<String> newFiles = new ArrayList<String>();
- for (String paramValueEach : split) {
- if (index < oldIndex) {
- log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
- newFiles.add(oldFiles.get(index));
- data.append(oldFiles.get(index++)).append(",");
- } else {
- String stageInputFiles = stageInputFiles(pbsCluster, paramValueEach, parentPath);
- StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString());
- GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
- newFiles.add(stageInputFiles);
- }
- }
- ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
- }
- inputNew.getParameters().put(paramName, actualParameter);
- }
+ Cluster pbsCluster = null;
+ // here doesn't matter what the job manager is because we are only doing some file handling
+ // not really dealing with monitoring or job submission, so we pa
+ String lastHost = null;
+
+ MessageContext input = jobExecutionContext.getInMessageContext();
+ Set<String> parameters = input.getParameters().keySet();
+ for (String paramName : parameters) {
+ ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName);
+ String paramValue = MappingFactory.toString(actualParameter);
+ // TODO: Review this with type
+ if ("URI".equals(actualParameter.getType().getType().toString())) {
+ try {
+ URL file = new URL(paramValue);
+ this.userName = file.getUserInfo();
+ this.hostName = file.getHost();
+ paramValue = file.getPath();
+ } catch (MalformedURLException e) {
+ log.error(e.getLocalizedMessage(),e);
+ }
+ ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
+ if (pbsCluster == null && (lastHost == null || !lastHost.equals(hostName))) {
+ pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
+ }
+ lastHost = hostName;
+
+ if (index < oldIndex) {
+ log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
+ ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index));
+ data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
+ } else {
+ String stageInputFile = stageInputFiles(pbsCluster, paramValue, parentPath);
+ ((URIParameterType) actualParameter.getType()).setValue(stageInputFile);
+ StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString());
+ status.setTransferState(TransferState.UPLOAD);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("Input Data Staged: " + stageInputFile);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+
+ GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+ }
+ } else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+ List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
+ List<String> newFiles = new ArrayList<String>();
+ for (String paramValueEach : split) {
+ try {
+ URL file = new URL(paramValue);
+ this.userName = file.getUserInfo();
+ this.hostName = file.getHost();
+ paramValueEach = file.getPath();
+ } catch (MalformedURLException e) {
+ log.error(e.getLocalizedMessage(),e);
+ }
+ ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
+ if (pbsCluster == null && (lastHost == null || !lastHost.equals(hostName))) {
+ pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
+ }
+ lastHost = hostName;
+
+ if (index < oldIndex) {
+ log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
+ newFiles.add(oldFiles.get(index));
+ data.append(oldFiles.get(index++)).append(",");
+ } else {
+ String stageInputFiles = stageInputFiles(pbsCluster, paramValueEach, parentPath);
+ StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString());
+ GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+ newFiles.add(stageInputFiles);
+ }
+ }
+ ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
+ }
+ inputNew.getParameters().put(paramName, actualParameter);
+ }
} catch (Exception e) {
log.error(e.getMessage());
throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
http://git-wip-us.apache.org/repos/asf/airavata/blob/6c500f2c/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
index 9b9f7b2..116d769 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
@@ -45,6 +45,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -117,12 +119,24 @@ public class AdvancedSCPOutputHandler extends AbstractHandler {
this.passPhrase);
}
// Server info
+ if(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().getOutputDataDir() != null){
+ try{
+ URL outputPathURL = new URL(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().getOutputDataDir());
+ this.userName = outputPathURL.getUserInfo();
+ this.hostName = outputPathURL.getHost();
+ outputPath = outputPathURL.getPath();
+ } catch (MalformedURLException e) {
+ log.error(e.getLocalizedMessage(),e);
+ }
+ }
ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
+ if(!jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().isPersistOutputData()){
outputPath = outputPath + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID()
+ File.separator;
pbsCluster.makeDirectory(outputPath);
+ }
pbsCluster.scpTo(outputPath, standardError);
pbsCluster.scpTo(outputPath, standardOutput);
List<DataObjectType> outputArray = new ArrayList<DataObjectType>();