You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/10/05 22:38:06 UTC

airavata git commit: Set sub task mode destination and source with correct URI string

Repository: airavata
Updated Branches:
  refs/heads/master 4d07c015b -> 808ff55bd


Set sub task mode destination and source with correct URI string


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/808ff55b
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/808ff55b
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/808ff55b

Branch: refs/heads/master
Commit: 808ff55bd1cd79b667077166bbaa9ab0314934f0
Parents: 4d07c01
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Mon Oct 5 16:37:58 2015 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Mon Oct 5 16:37:58 2015 -0400

----------------------------------------------------------------------
 .../gfac/core/context/ProcessContext.java       |  13 +-
 .../org/apache/airavata/gfac/impl/Factory.java  |   1 +
 .../airavata/gfac/impl/GFacEngineImpl.java      |  34 ++-
 .../impl/task/AdvancedSCPDataStageTask.java     | 256 ++++++++++---------
 4 files changed, 173 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/808ff55b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index 5a773e7..8655142 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -23,6 +23,7 @@ package org.apache.airavata.gfac.core.context;
 
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.cluster.ServerInfo;
 import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
 import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
@@ -79,7 +80,9 @@ public class ProcessContext {
 	private ResourceJobManager resourceJobManager;
 	private boolean handOver;
 	private boolean cancel;
-	/**
+    private ServerInfo serverInfo;
+
+    /**
 	 * Note: process context property use lazy loading approach. In runtime you will see some properties as null
 	 * unless you have access it previously. Once that property access using the api,it will be set to correct value.
 	 */
@@ -361,4 +364,12 @@ public class ProcessContext {
 	public boolean isInterrupted(){
 		return this.cancel || this.handOver;
 	}
+
+    public void setServerInfo(ServerInfo serverInfo) {
+        this.serverInfo = serverInfo;
+    }
+
+    public ServerInfo getServerInfo() {
+        return serverInfo;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/808ff55b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index fbba17b..e60e6eb 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -210,6 +210,7 @@ public abstract class Factory {
 		if (remoteCluster == null) {
 			String hostName = Factory.getDefaultAppCatalog().getComputeResource().getComputeResource(computeResourceId).getHostName();
 			ServerInfo serverInfo = new ServerInfo(processContext.getComputeResourcePreference().getLoginUserName(), hostName);
+            processContext.setServerInfo(serverInfo);
 			JobManagerConfiguration jobManagerConfiguration = getJobManagerConfiguration(processContext.getResourceJobManager());
 			AuthenticationInfo authenticationInfo = getSSHKeyAuthentication();
 			remoteCluster = new HPCRemoteCluster(serverInfo, jobManagerConfiguration, authenticationInfo);

http://git-wip-us.apache.org/repos/asf/airavata/blob/808ff55b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index b2be6be..a7f6b32 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -25,9 +25,11 @@ import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.gfac.core.GFacConstants;
 import org.apache.airavata.gfac.core.GFacEngine;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.core.cluster.ServerInfo;
 import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.monitor.JobMonitor;
@@ -68,6 +70,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
@@ -251,7 +255,7 @@ public class GFacEngineImpl implements GFacEngine {
 					case URI:
 						try {
 							taskCtx = getDataStagingTaskContext(processContext, processInput);
-						} catch (TException e) {
+						} catch (TException | TaskException e) {
 							throw new GFacException("Error while serializing data staging sub task model");
 						}
 						saveTaskModel(taskCtx);
@@ -442,7 +446,7 @@ public class GFacEngineImpl implements GFacEngine {
 				case URI: case STDERR: case STDOUT:
 					try {
 						taskCtx = getDataStagingTaskContext(processContext, processOutput);
-					} catch (TException e) {
+					} catch (TException | TaskException e) {
 						throw new GFacException("Thrift model to byte[] conversion issue", e);
 					}
 					saveTaskModel(taskCtx);
@@ -557,7 +561,7 @@ public class GFacEngineImpl implements GFacEngine {
 	}
 
 	private TaskContext getDataStagingTaskContext(ProcessContext processContext, InputDataObjectType processInput)
-			throws TException {
+            throws TException, TaskException {
 		TaskContext taskCtx = new TaskContext();
 		taskCtx.setParentProcessContext(processContext);
 		// create new task model for this task
@@ -572,7 +576,15 @@ public class GFacEngineImpl implements GFacEngine {
 		// create data staging sub task model
 		DataStagingTaskModel submodel = new DataStagingTaskModel();
 		submodel.setSource(processInput.getValue());
-		submodel.setDestination(processContext.getDataMovementProtocol().name() + ":" + processContext.getWorkingDir());
+        ServerInfo serverInfo = processContext.getServerInfo();
+        URI destination = null;
+        try {
+            destination = new URI(processContext.getDataMovementProtocol().name(), serverInfo.getHost(),
+                    serverInfo.getUserName(), serverInfo.getPort(), processContext.getWorkingDir(), null, null);
+        } catch (URISyntaxException e) {
+            throw new TaskException("Error while constructing destination file URI");
+        }
+        submodel.setDestination(destination.toString());
 		taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
 		taskCtx.setTaskModel(taskModel);
         taskCtx.setProcessInput(processInput);
@@ -580,7 +592,7 @@ public class GFacEngineImpl implements GFacEngine {
 	}
 
 	private TaskContext getDataStagingTaskContext(ProcessContext processContext, OutputDataObjectType processOutput)
-			throws TException {
+            throws TException, TaskException {
 		TaskContext taskCtx = new TaskContext();
 		taskCtx.setParentProcessContext(processContext);
 		// create new task model for this task
@@ -596,8 +608,16 @@ public class GFacEngineImpl implements GFacEngine {
 		String remoteOutputDir = processContext.getOutputDir();
 		remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir : remoteOutputDir + "/";
 		DataStagingTaskModel submodel = new DataStagingTaskModel();
-		submodel.setSource(processContext.getDataMovementProtocol().name() + ":" + remoteOutputDir + processOutput
-				.getValue());
+        ServerInfo serverInfo = processContext.getServerInfo();
+        URI source = null;
+        try {
+            source = new URI(processContext.getDataMovementProtocol().name(), serverInfo.getHost(),
+                    serverInfo.getUserName(), serverInfo.getPort(), remoteOutputDir + processOutput.getValue(), null, null);
+        } catch (URISyntaxException e) {
+            throw new TaskException("Error while constructing source file URI");
+        }
+        submodel.setSource(source.toString());
+        // TODO after thridpary scp implemented we can fix following destination location correct one.
 		String localWorkingDir = processContext.getLocalWorkingDir();
 		submodel.setDestination("file://" + localWorkingDir);
 		taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));

http://git-wip-us.apache.org/repos/asf/airavata/blob/808ff55b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
index 1c72fab..269ce10 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
@@ -70,68 +70,86 @@ import java.util.Map;
  * This will be used for both Input file staging and output file staging, hence if you do any changes to a part of logic
  * in this class please consider that will works with both input and output cases.
  */
-public class AdvancedSCPDataStageTask implements Task{
-	private static final Logger log = LoggerFactory.getLogger(AdvancedSCPDataStageTask.class);
-	private static final int DEFAULT_SSH_PORT = 22;
-	private String password;
-	private String publicKeyPath;
-	private String passPhrase;
-	private String privateKeyPath;
-	private String userName;
-	private String hostName;
-	private String inputPath;
+public class AdvancedSCPDataStageTask implements Task {
+    private static final Logger log = LoggerFactory.getLogger(AdvancedSCPDataStageTask.class);
+    private static final int DEFAULT_SSH_PORT = 22;
+    private String password;
+    private String publicKeyPath;
+    private String passPhrase;
+    private String privateKeyPath;
+    private String userName;
+    private String hostName;
+    private String inputPath;
 
-	@Override
-	public void init(Map<String, String> propertyMap) throws TaskException {
-		inputPath = propertyMap.get("inputPath");
+    @Override
+    public void init(Map<String, String> propertyMap) throws TaskException {
+        inputPath = propertyMap.get("inputPath");
         hostName = propertyMap.get("hostName");
         userName = propertyMap.get("userName");
-	}
+    }
 
-	@Override
-	public TaskStatus execute(TaskContext taskContext) {
+    @Override
+    public TaskStatus execute(TaskContext taskContext) {
         TaskStatus status = new TaskStatus(TaskState.CREATED);
         AuthenticationInfo authenticationInfo = null;
         DataStagingTaskModel subTaskModel = null;
-		ProcessState processState = taskContext.getParentProcessContext().getProcessState();
-		if (processState == ProcessState.OUTPUT_DATA_STAGING) {
-			OutputDataObjectType processOutput = taskContext.getProcessOutput();
-			if (processOutput != null && processOutput.getValue() == null) {
-				log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
-						taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
-						processOutput.getName());
-				status = new TaskStatus(TaskState.FAILED);
-				if (processOutput.isIsRequired()) {
-					status.setReason("File name is null, but this output's isRequired bit is not set");
-				} else {
-					status.setReason("File name is null");
-				}
-				return status;
-			}
-		} else if (processState == ProcessState.INPUT_DATA_STAGING) {
-			InputDataObjectType processInput = taskContext.getProcessInput();
-			if (processInput != null && processInput.getValue() == null) {
-				log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
-						taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
-						processInput.getName());
-				status = new TaskStatus(TaskState.FAILED);
-				if (processInput.isIsRequired()) {
-					status.setReason("File name is null, but this input's isRequired bit is not set");
-				} else {
-					status.setReason("File name is null");
-				}
-				return status;
-			}
-		} else {
-			status.setState(TaskState.FAILED);
-			status.setReason("Invalid task invocation, Support " + ProcessState.INPUT_DATA_STAGING.name() + " and " +
-					"" + ProcessState.OUTPUT_DATA_STAGING.name() + " process phases. found " + processState.name());
-		}
+        String localDataDir = null;
+        ProcessState processState = taskContext.getParentProcessContext().getProcessState();
+        try {
+            subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel
+                    (taskContext.getTaskModel());
+            if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+                OutputDataObjectType processOutput = taskContext.getProcessOutput();
+                if (processOutput != null && processOutput.getValue() == null) {
+                    log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
+                            taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+                            processOutput.getName());
+                    status = new TaskStatus(TaskState.FAILED);
+                    if (processOutput.isIsRequired()) {
+                        status.setReason("File name is null, but this output's isRequired bit is not set");
+                    } else {
+                        status.setReason("File name is null");
+                    }
+                    localDataDir = subTaskModel.getDestination();
+                    return status;
+                }
+            } else if (processState == ProcessState.INPUT_DATA_STAGING) {
+                InputDataObjectType processInput = taskContext.getProcessInput();
+                if (processInput != null && processInput.getValue() == null) {
+                    log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
+                            taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+                            processInput.getName());
+                    status = new TaskStatus(TaskState.FAILED);
+                    if (processInput.isIsRequired()) {
+                        status.setReason("File name is null, but this input's isRequired bit is not set");
+                    } else {
+                        status.setReason("File name is null");
+                    }
+                    return status;
+                }
+            } else {
+                status.setState(TaskState.FAILED);
+                status.setReason("Invalid task invocation, Support " + ProcessState.INPUT_DATA_STAGING.name() + " and " +
+                        "" + ProcessState.OUTPUT_DATA_STAGING.name() + " process phases. found " + processState.name());
+            }
 
-		try {
             // use cp instead of scp if source and destination host and user name is same.
             URI sourceURI = new URI(subTaskModel.getSource());
-            URI destinationURI = new URI(subTaskModel.getDestination());
+            String fileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1,
+                    sourceURI.getPath().length());
+            String targetPath = null;
+            String targetFilePath = null;
+            URI destinationURI = null;
+            if (localDataDir != null) {
+                targetPath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator) +
+                        taskContext.getParentProcessContext().getProcessId();
+                targetFilePath = targetPath + File.separator + fileName;
+                destinationURI = new URI("SCP", hostName, targetFilePath, null);
+                subTaskModel.setDestination(destinationURI.toString());
+
+            } else {
+                destinationURI = new URI(subTaskModel.getDestination());
+            }
 
             if (sourceURI.getHost().equalsIgnoreCase(destinationURI.getHost())
                     && sourceURI.getUserInfo().equalsIgnoreCase(destinationURI.getUserInfo())) {
@@ -145,8 +163,8 @@ public class AdvancedSCPDataStageTask implements Task{
             String tokenId = taskContext.getParentProcessContext().getTokenId();
             CredentialReader credentialReader = GFacUtils.getCredentialReader();
             Credential credential = credentialReader.getCredential(taskContext.getParentProcessContext().getGatewayId(), tokenId);
-            if (credential instanceof SSHCredential){
-                SSHCredential sshCredential =  (SSHCredential)credential;
+            if (credential instanceof SSHCredential) {
+                SSHCredential sshCredential = (SSHCredential) credential;
                 byte[] publicKey = sshCredential.getPublicKey();
                 publicKeyPath = writeFileToDisk(publicKey);
                 byte[] privateKey = sshCredential.getPrivateKey();
@@ -154,7 +172,7 @@ public class AdvancedSCPDataStageTask implements Task{
                 passPhrase = sshCredential.getPassphrase();
 //                userName = sshCredential.getPortalUserName(); // this might not same as login user name
                 authenticationInfo = getSSHKeyAuthentication();
-            }else {
+            } else {
                 String msg = "Provided credential store token is not valid. Please provide the correct credential store token";
                 log.error(msg);
                 status.setState(TaskState.FAILED);
@@ -166,8 +184,7 @@ public class AdvancedSCPDataStageTask implements Task{
                 return status;
             }
             status = new TaskStatus(TaskState.COMPLETED);
-            subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel
-                    (taskContext.getTaskModel());
+
 
             File templocalDataDir = GFacUtils.getLocalDataDir(taskContext);
             if (!templocalDataDir.exists()) {
@@ -176,37 +193,30 @@ public class AdvancedSCPDataStageTask implements Task{
                 }
             }
 
-            String fileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1,
-                    sourceURI.getPath().length());
             String filePath = templocalDataDir + File.separator + fileName;
 
             ServerInfo serverInfo = new ServerInfo(userName, hostName, DEFAULT_SSH_PORT);
             Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
-	        if (processState == ProcessState.INPUT_DATA_STAGING) {
+            if (processState == ProcessState.INPUT_DATA_STAGING) {
                 inputDataStaging(taskContext, sshSession, sourceURI, destinationURI, filePath);
                 status.setReason("Successfully staged input data");
-            }else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
-		        String targetPath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator) +
-				        taskContext.getParentProcessContext().getProcessId();
-		        SSHUtils.makeDirectory(targetPath, sshSession);
-		        String targetFilePath =  targetPath + File.separator + fileName;
-		        destinationURI = new URI("SCP", hostName, targetFilePath, null);
-		        subTaskModel.setDestination(destinationURI.getPath());
-		        // TODO - save updated subtask model with new destination
-		        outputDataStaging(taskContext, sshSession, sourceURI, destinationURI, filePath);
-		        status.setReason("Successfully staged output data");
-	        }
-        }  catch (TException e) {
-			String msg = "Couldn't create subTask model thrift model";
-			log.error(msg, e);
-			status.setState(TaskState.FAILED);
-			status.setReason(msg);
-			ErrorModel errorModel = new ErrorModel();
-			errorModel.setActualErrorMessage(e.getMessage());
-			errorModel.setUserFriendlyMessage(msg);
-			taskContext.getTaskModel().setTaskError(errorModel);
-			return status;
-		} catch (ApplicationSettingsException | FileNotFoundException | CredentialStoreException | IllegalAccessException | InstantiationException e) {
+            } else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+                SSHUtils.makeDirectory(targetPath, sshSession);
+                // TODO - save updated subtask model with new destination
+                outputDataStaging(taskContext, sshSession, sourceURI, destinationURI, filePath);
+                status.setReason("Successfully staged output data");
+            }
+        } catch (TException e) {
+            String msg = "Couldn't create subTask model thrift model";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+            return status;
+        } catch (ApplicationSettingsException | FileNotFoundException | CredentialStoreException | IllegalAccessException | InstantiationException e) {
             String msg = "Failed while reading credentials";
             log.error(msg, e);
             status.setState(TaskState.FAILED);
@@ -243,7 +253,7 @@ public class AdvancedSCPDataStageTask implements Task{
             errorModel.setActualErrorMessage(e.getMessage());
             errorModel.setUserFriendlyMessage(msg);
             taskContext.getTaskModel().setTaskError(errorModel);
-        } catch (JSchException | IOException e ) {
+        } catch (JSchException | IOException e) {
             String msg = "Failed to do scp with client";
             log.error(msg, e);
             status.setState(TaskState.FAILED);
@@ -263,7 +273,7 @@ public class AdvancedSCPDataStageTask implements Task{
             taskContext.getTaskModel().setTaskError(errorModel);
         }
         return status;
-	}
+    }
 
     private void localDataCopy(TaskContext taskContext, URI sourceURI, URI destinationURI) throws SSHApiException {
         StringBuilder sb = new StringBuilder("rsync -cr ");
@@ -272,38 +282,38 @@ public class AdvancedSCPDataStageTask implements Task{
         taskContext.getParentProcessContext().getRemoteCluster().execute(commandInfo);
     }
 
-	private void inputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI
-			destinationURI, String filePath) throws SSHApiException, IOException, JSchException {
-		/**
-		 * scp remote client file to airavata local dir.
-		 */
-		SSHUtils.scpFrom(sourceURI.getPath(), filePath, sshSession);
+    private void inputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI
+            destinationURI, String filePath) throws SSHApiException, IOException, JSchException {
+        /**
+         * scp remote client file to airavata local dir.
+         */
+        SSHUtils.scpFrom(sourceURI.getPath(), filePath, sshSession);
 
-		/**
-		 * scp local file to compute resource.
-		 */
-		taskContext.getParentProcessContext().getRemoteCluster().scpTo(filePath, destinationURI.getPath());
-	}
+        /**
+         * scp local file to compute resource.
+         */
+        taskContext.getParentProcessContext().getRemoteCluster().scpTo(filePath, destinationURI.getPath());
+    }
 
-	private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI destinationURI,
-	                               String filePath) throws SSHApiException, AiravataException, IOException, JSchException, GFacException {
-		/**
-		 * scp remote file from comute resource to airavata local
-		 */
-		taskContext.getParentProcessContext().getRemoteCluster().scpFrom(sourceURI.getPath(), filePath);
+    private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI destinationURI,
+                                   String filePath) throws SSHApiException, AiravataException, IOException, JSchException, GFacException {
+        /**
+         * scp remote file from comute resource to airavata local
+         */
+        taskContext.getParentProcessContext().getRemoteCluster().scpFrom(sourceURI.getPath(), filePath);
 
-		/**
-		 * scp local file to remote client
-		 */
-		SSHUtils.scpTo(filePath, destinationURI.getPath(), sshSession);
+        /**
+         * scp local file to remote client
+         */
+        SSHUtils.scpTo(filePath, destinationURI.getPath(), sshSession);
         // update output locations
         GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath());
         GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath());
 
     }
 
-	@Override
-	public TaskStatus recover(TaskContext taskContext) {
+    @Override
+    public TaskStatus recover(TaskContext taskContext) {
         TaskState state = taskContext.getTaskStatus().getState();
         if (state == TaskState.EXECUTING || state == TaskState.CREATED) {
             return execute(taskContext);
@@ -313,24 +323,24 @@ public class AdvancedSCPDataStageTask implements Task{
         }
     }
 
-	@Override
-	public TaskTypes getType() {
-		return TaskTypes.DATA_STAGING;
-	}
+    @Override
+    public TaskTypes getType() {
+        return TaskTypes.DATA_STAGING;
+    }
 
-	private SSHPasswordAuthentication getSSHPasswordAuthentication() {
-		return new SSHPasswordAuthentication(userName, password);
-	}
+    private SSHPasswordAuthentication getSSHPasswordAuthentication() {
+        return new SSHPasswordAuthentication(userName, password);
+    }
 
-	private  SSHKeyAuthentication getSSHKeyAuthentication(){
-		SSHKeyAuthentication sshKA = new SSHKeyAuthentication();
-		sshKA.setUserName(userName);
-		sshKA.setPassphrase(passPhrase);
-		sshKA.setPrivateKeyFilePath(privateKeyPath);
-		sshKA.setPublicKeyFilePath(publicKeyPath);
-		sshKA.setStrictHostKeyChecking("no");
-		return sshKA;
-	}
+    private SSHKeyAuthentication getSSHKeyAuthentication() {
+        SSHKeyAuthentication sshKA = new SSHKeyAuthentication();
+        sshKA.setUserName(userName);
+        sshKA.setPassphrase(passPhrase);
+        sshKA.setPrivateKeyFilePath(privateKeyPath);
+        sshKA.setPublicKeyFilePath(publicKeyPath);
+        sshKA.setStrictHostKeyChecking("no");
+        return sshKA;
+    }
 
     private String writeFileToDisk(byte[] data) {
         File temp = null;