You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/11/09 21:25:58 UTC

[2/2] airavata git commit: Added Local batch job submission support and renamed task implementtions to have generic names

Added Local batch job submission support and renamed task implementtions to have generic names


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/dc1be312
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/dc1be312
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/dc1be312

Branch: refs/heads/master
Commit: dc1be3126409992f6de94d76e0fc4834540e9e9c
Parents: c62f74a
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Mon Nov 9 15:25:52 2015 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Mon Nov 9 15:25:52 2015 -0500

----------------------------------------------------------------------
 .../server/src/main/resources/gfac-config.yaml  |  11 +-
 .../org/apache/airavata/gfac/impl/Factory.java  |  16 +-
 .../airavata/gfac/impl/GFacEngineImpl.java      |   4 +-
 .../airavata/gfac/impl/LocalCommandOutput.java  |   2 +-
 .../impl/task/AdvancedSCPDataStageTask.java     | 344 -----------------
 .../airavata/gfac/impl/task/DataStageTask.java  | 126 +++++++
 .../impl/task/DefaultJobSubmissionTask.java     | 286 +++++++++++++++
 .../gfac/impl/task/EnvironmentSetupTask.java    |  74 ++++
 .../gfac/impl/task/ForkJobSubmissionTask.java   | 184 ++++++++++
 .../gfac/impl/task/SCPDataStageTask.java        | 365 +++++++++++++++----
 .../gfac/impl/task/SSHEnvironmentSetupTask.java |  74 ----
 .../impl/task/SSHForkJobSubmissionTask.java     | 184 ----------
 .../gfac/impl/task/SSHJobSubmissionTask.java    | 286 ---------------
 13 files changed, 980 insertions(+), 976 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/configuration/server/src/main/resources/gfac-config.yaml
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/gfac-config.yaml b/modules/configuration/server/src/main/resources/gfac-config.yaml
index 8dafe09..dff4062 100644
--- a/modules/configuration/server/src/main/resources/gfac-config.yaml
+++ b/modules/configuration/server/src/main/resources/gfac-config.yaml
@@ -20,7 +20,7 @@
 
 jobSubmitters:
   - submissionProtocol: SSH
-    taskClass: org.apache.airavata.gfac.impl.task.SSHJobSubmissionTask
+    taskClass: org.apache.airavata.gfac.impl.task.DefaultJobSubmissionTask
 #   properties:
 #     - userName: airavata
 #       passPhrase: airavata
@@ -29,10 +29,11 @@ jobSubmitters:
 #       hostName: remote.client.hostName
 
   - submissionProtocol: SSH_FORK
-    taskClass: org.apache.airavata.gfac.impl.task.SSHForkJobSubmissionTask
+    taskClass: org.apache.airavata.gfac.impl.task.ForkJobSubmissionTask
 
   - submissionProtocol: LOCAL
-    taskClass: org.apache.airavata.gfac.impl.task.LocalJobSubmissionTask
+    taskClass: org.apache.airavata.gfac.impl.task.DefaultJobSubmissionTask
+
 
 # Following job subbmitters are not yet implemented.
 
@@ -47,14 +48,14 @@ commonTasks:
 
 fileTransferTasks:
   - transferProtocol: SCP
-    taskClass: org.apache.airavata.gfac.impl.task.SCPDataStageTask
+    taskClass: org.apache.airavata.gfac.impl.task.DataStageTask
 
 # If your client doen't run the same instance where airavata server is running then you need to comment above
 # SCPDataStageTask and uncomment AdvancedSCPDataStageTask. To work with AdvancedSCPDataStageTask, you either need to
 # provide ssh keys or password.
 
 #  - transferProtocol: SCP
-#    taskClass: org.apache.airavata.gfac.impl.task.AdvancedSCPDataStageTask
+#    taskClass: org.apache.airavata.gfac.impl.task.SCPDataStageTask
 #    properties:
 #     - userName: airavata
 #       passPhrase: airavata

http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index e2966c5..4dc63e6 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -207,11 +207,17 @@ public abstract class Factory {
         String computeResourceId = processContext.getComputeResourceId();
         String key = processContext.getJobSubmissionProtocol().toString() + ":" + computeResourceId;
 		RemoteCluster remoteCluster = remoteClusterMap.get(key);
-		if (remoteCluster == null) {
-			JobManagerConfiguration jobManagerConfiguration = getJobManagerConfiguration(processContext.getResourceJobManager());
-			AuthenticationInfo authenticationInfo = getSSHKeyAuthentication();
-			remoteCluster = new HPCRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration, authenticationInfo);
-			remoteClusterMap.put(key, remoteCluster);
+		JobSubmissionProtocol jobSubmissionProtocol = processContext.getJobSubmissionProtocol();
+        if (remoteCluster == null) {
+            JobManagerConfiguration jobManagerConfiguration = getJobManagerConfiguration(processContext.getResourceJobManager());
+            if (jobSubmissionProtocol == JobSubmissionProtocol.LOCAL) {
+                remoteCluster = new LocalRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration, null);
+            } else if (jobSubmissionProtocol == JobSubmissionProtocol.SSH ||
+                    jobSubmissionProtocol == JobSubmissionProtocol.SSH_FORK) {
+                AuthenticationInfo authenticationInfo = getSSHKeyAuthentication();
+                remoteCluster = new HPCRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration, authenticationInfo);
+            }
+            remoteClusterMap.put(key, remoteCluster);
 		}
 		return remoteCluster;
 	}

http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 6444eb4..d386f66 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -35,7 +35,7 @@ import org.apache.airavata.gfac.core.monitor.JobMonitor;
 import org.apache.airavata.gfac.core.task.JobSubmissionTask;
 import org.apache.airavata.gfac.core.task.Task;
 import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.task.SSHEnvironmentSetupTask;
+import org.apache.airavata.gfac.impl.task.EnvironmentSetupTask;
 import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
 import org.apache.airavata.model.appcatalog.computeresource.*;
 import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
@@ -356,7 +356,7 @@ public class GFacEngineImpl implements GFacEngine {
             EnvironmentSetupTaskModel subTaskModel = (EnvironmentSetupTaskModel) taskContext.getSubTaskModel();
             Task envSetupTask = null;
             if (subTaskModel.getProtocol() == SecurityProtocol.SSH_KEYS) {
-                envSetupTask = new SSHEnvironmentSetupTask();
+                envSetupTask = new EnvironmentSetupTask();
             } else {
                 throw new GFacException("Unsupported security protocol, Airavata doesn't support " +
                         subTaskModel.getProtocol().name() + " protocol yet.");

http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java
index e9d683d..4d98423 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java
@@ -55,6 +55,6 @@ public class LocalCommandOutput implements CommandOutput {
 
     @Override
     public int getExitCode() {
-        return 0;
+        return process.exitValue();
     }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
deleted file mode 100644
index 6c4e14e..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.airavata.gfac.impl.task;
-
-import com.jcraft.jsch.JSchException;
-import com.jcraft.jsch.Session;
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.credential.store.credential.Credential;
-import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
-import org.apache.airavata.credential.store.store.CredentialReader;
-import org.apache.airavata.credential.store.store.CredentialStoreException;
-import org.apache.airavata.gfac.core.GFacException;
-import org.apache.airavata.gfac.core.GFacUtils;
-import org.apache.airavata.gfac.core.SSHApiException;
-import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
-import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication;
-import org.apache.airavata.gfac.core.authentication.SSHPasswordAuthentication;
-import org.apache.airavata.gfac.core.cluster.CommandInfo;
-import org.apache.airavata.gfac.core.cluster.RawCommandInfo;
-import org.apache.airavata.gfac.core.cluster.RemoteCluster;
-import org.apache.airavata.gfac.core.cluster.ServerInfo;
-import org.apache.airavata.gfac.core.context.TaskContext;
-import org.apache.airavata.gfac.core.task.Task;
-import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.Factory;
-import org.apache.airavata.gfac.impl.SSHUtils;
-import org.apache.airavata.model.application.io.InputDataObjectType;
-import org.apache.airavata.model.application.io.OutputDataObjectType;
-import org.apache.airavata.model.commons.ErrorModel;
-import org.apache.airavata.model.status.ProcessState;
-import org.apache.airavata.model.status.TaskState;
-import org.apache.airavata.model.status.TaskStatus;
-import org.apache.airavata.model.task.DataStagingTaskModel;
-import org.apache.airavata.model.task.TaskTypes;
-import org.apache.airavata.registry.cpi.ExperimentCatalog;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This will be used for both Input file staging and output file staging, hence if you do any changes to a part of logic
- * in this class please consider that will works with both input and output cases.
- */
-public class AdvancedSCPDataStageTask implements Task {
-    private static final Logger log = LoggerFactory.getLogger(AdvancedSCPDataStageTask.class);
-    private static final int DEFAULT_SSH_PORT = 22;
-    private String password;
-    private String publicKeyPath;
-    private String passPhrase;
-    private String privateKeyPath;
-    private String userName;
-    private String hostName;
-    private String inputPath;
-
-    @Override
-    public void init(Map<String, String> propertyMap) throws TaskException {
-        inputPath = propertyMap.get("inputPath");
-        hostName = propertyMap.get("hostName");
-        userName = propertyMap.get("userName");
-    }
-
-    @Override
-    public TaskStatus execute(TaskContext taskContext) {
-        TaskStatus status = new TaskStatus(TaskState.EXECUTING);
-        AuthenticationInfo authenticationInfo = null;
-        DataStagingTaskModel subTaskModel = null;
-        String localDataDir = null;
-        ProcessState processState = taskContext.getParentProcessContext().getProcessState();
-        try {
-            subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel
-                    (taskContext.getTaskModel());
-            if (processState == ProcessState.OUTPUT_DATA_STAGING) {
-                OutputDataObjectType processOutput = taskContext.getProcessOutput();
-                if (processOutput != null && processOutput.getValue() == null) {
-                    log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
-                            taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
-                            processOutput.getName());
-                    status = new TaskStatus(TaskState.FAILED);
-                    if (processOutput.isIsRequired()) {
-                        status.setReason("File name is null, but this output's isRequired bit is not set");
-                    } else {
-                        status.setReason("File name is null");
-                    }
-                    return status;
-                }
-            } else if (processState == ProcessState.INPUT_DATA_STAGING) {
-                InputDataObjectType processInput = taskContext.getProcessInput();
-                if (processInput != null && processInput.getValue() == null) {
-                    log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
-                            taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
-                            processInput.getName());
-                    status = new TaskStatus(TaskState.FAILED);
-                    if (processInput.isIsRequired()) {
-                        status.setReason("File name is null, but this input's isRequired bit is not set");
-                    } else {
-                        status.setReason("File name is null");
-                    }
-                    return status;
-                }
-            } else {
-                status.setState(TaskState.FAILED);
-                status.setReason("Invalid task invocation, Support " + ProcessState.INPUT_DATA_STAGING.name() + " and " +
-                        "" + ProcessState.OUTPUT_DATA_STAGING.name() + " process phases. found " + processState.name());
-                return status;
-            }
-
-            // use rsync instead of scp if source and destination host and user name is same.
-            URI sourceURI = new URI(subTaskModel.getSource());
-            String fileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1,
-                    sourceURI.getPath().length());
-            URI destinationURI = null;
-            if (subTaskModel.getDestination().startsWith("dummy")) {
-                destinationURI = getDestinationURI(taskContext, fileName);
-                subTaskModel.setDestination(destinationURI.toString());
-            } else {
-                destinationURI = new URI(subTaskModel.getDestination());
-            }
-
-            if (sourceURI.getHost().equalsIgnoreCase(destinationURI.getHost())
-                    && sourceURI.getUserInfo().equalsIgnoreCase(destinationURI.getUserInfo())) {
-                localDataCopy(taskContext, sourceURI, destinationURI);
-                status.setState(TaskState.COMPLETED);
-                status.setReason("Locally copied file using 'cp' command ");
-                return status;
-            }
-
-
-            String tokenId = taskContext.getParentProcessContext().getTokenId();
-            CredentialReader credentialReader = GFacUtils.getCredentialReader();
-            Credential credential = credentialReader.getCredential(taskContext.getParentProcessContext().getGatewayId(), tokenId);
-            if (credential instanceof SSHCredential) {
-                SSHCredential sshCredential = (SSHCredential) credential;
-                byte[] publicKey = sshCredential.getPublicKey();
-                publicKeyPath = writeFileToDisk(publicKey);
-                byte[] privateKey = sshCredential.getPrivateKey();
-                privateKeyPath = writeFileToDisk(privateKey);
-                passPhrase = sshCredential.getPassphrase();
-//                userName = sshCredential.getPortalUserName(); // this might not same as login user name
-                authenticationInfo = getSSHKeyAuthentication();
-            } else {
-                String msg = "Provided credential store token is not valid. Please provide the correct credential store token";
-                log.error(msg);
-                status.setState(TaskState.FAILED);
-                status.setReason(msg);
-                ErrorModel errorModel = new ErrorModel();
-                errorModel.setActualErrorMessage(msg);
-                errorModel.setUserFriendlyMessage(msg);
-                taskContext.getTaskModel().setTaskError(errorModel);
-                return status;
-            }
-            status = new TaskStatus(TaskState.COMPLETED);
-
-            ServerInfo serverInfo = new ServerInfo(userName, hostName, DEFAULT_SSH_PORT);
-            Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
-            if (processState == ProcessState.INPUT_DATA_STAGING) {
-                inputDataStaging(taskContext, sshSession, sourceURI, destinationURI);
-                status.setReason("Successfully staged input data");
-            } else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
-                String targetPath = destinationURI.getPath().substring(0, destinationURI.getPath().lastIndexOf('/'));
-                SSHUtils.makeDirectory(targetPath, sshSession);
-                // TODO - save updated subtask model with new destination
-                outputDataStaging(taskContext, sshSession, sourceURI, destinationURI);
-                status.setReason("Successfully staged output data");
-            }
-        } catch (TException e) {
-            String msg = "Couldn't create subTask model thrift model";
-            log.error(msg, e);
-            status.setState(TaskState.FAILED);
-            status.setReason(msg);
-            ErrorModel errorModel = new ErrorModel();
-            errorModel.setActualErrorMessage(e.getMessage());
-            errorModel.setUserFriendlyMessage(msg);
-            taskContext.getTaskModel().setTaskError(errorModel);
-            return status;
-        } catch (ApplicationSettingsException | FileNotFoundException | CredentialStoreException | IllegalAccessException | InstantiationException e) {
-            String msg = "Failed while reading credentials";
-            log.error(msg, e);
-            status.setState(TaskState.FAILED);
-            status.setReason(msg);
-            ErrorModel errorModel = new ErrorModel();
-            errorModel.setActualErrorMessage(e.getMessage());
-            errorModel.setUserFriendlyMessage(msg);
-            taskContext.getTaskModel().setTaskError(errorModel);
-        } catch (URISyntaxException e) {
-            String msg = "Sorce or destination uri is not correct source : " + subTaskModel.getSource() + ", " +
-                    "destination : " + subTaskModel.getDestination();
-            log.error(msg, e);
-            status.setState(TaskState.FAILED);
-            status.setReason(msg);
-            ErrorModel errorModel = new ErrorModel();
-            errorModel.setActualErrorMessage(e.getMessage());
-            errorModel.setUserFriendlyMessage(msg);
-            taskContext.getTaskModel().setTaskError(errorModel);
-        } catch (SSHApiException e) {
-            String msg = "Failed to do scp with compute resource";
-            log.error(msg, e);
-            status.setState(TaskState.FAILED);
-            status.setReason(msg);
-            ErrorModel errorModel = new ErrorModel();
-            errorModel.setActualErrorMessage(e.getMessage());
-            errorModel.setUserFriendlyMessage(msg);
-            taskContext.getTaskModel().setTaskError(errorModel);
-        } catch (AiravataException e) {
-            String msg = "Error while creating ssh session with client";
-            log.error(msg, e);
-            status.setState(TaskState.FAILED);
-            status.setReason(msg);
-            ErrorModel errorModel = new ErrorModel();
-            errorModel.setActualErrorMessage(e.getMessage());
-            errorModel.setUserFriendlyMessage(msg);
-            taskContext.getTaskModel().setTaskError(errorModel);
-        } catch (JSchException | IOException e) {
-            String msg = "Failed to do scp with client";
-            log.error(msg, e);
-            status.setState(TaskState.FAILED);
-            status.setReason(msg);
-            ErrorModel errorModel = new ErrorModel();
-            errorModel.setActualErrorMessage(e.getMessage());
-            errorModel.setUserFriendlyMessage(msg);
-            taskContext.getTaskModel().setTaskError(errorModel);
-        } catch (GFacException e) {
-            String msg = "Failed update experiment and process inputs and outputs";
-            log.error(msg, e);
-            status.setState(TaskState.FAILED);
-            status.setReason(msg);
-            ErrorModel errorModel = new ErrorModel();
-            errorModel.setActualErrorMessage(e.getMessage());
-            errorModel.setUserFriendlyMessage(msg);
-            taskContext.getTaskModel().setTaskError(errorModel);
-        }
-        return status;
-    }
-
-    private void localDataCopy(TaskContext taskContext, URI sourceURI, URI destinationURI) throws SSHApiException {
-        StringBuilder sb = new StringBuilder("rsync -cr ");
-        sb.append(sourceURI.getPath()).append(" ").append(destinationURI.getPath());
-        CommandInfo commandInfo = new RawCommandInfo(sb.toString());
-        taskContext.getParentProcessContext().getRemoteCluster().execute(commandInfo);
-    }
-
-    private void inputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI
-            destinationURI) throws SSHApiException, IOException, JSchException {
-        /**
-         * scp third party file transfer 'to' compute resource.
-         */
-        taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(),
-                destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.TO);
-    }
-
-    private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI destinationURI)
-            throws SSHApiException, AiravataException, IOException, JSchException, GFacException {
-
-        /**
-         * scp third party file transfer 'from' comute resource.
-         */
-        taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(),
-                destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.FROM);
-        // update output locations
-        GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath());
-        GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath());
-
-    }
-
-    @Override
-    public TaskStatus recover(TaskContext taskContext) {
-        TaskState state = taskContext.getTaskStatus().getState();
-        if (state == TaskState.EXECUTING || state == TaskState.CREATED) {
-            return execute(taskContext);
-        } else {
-            // files already transferred or failed
-            return taskContext.getTaskStatus();
-        }
-    }
-
-    @Override
-    public TaskTypes getType() {
-        return TaskTypes.DATA_STAGING;
-    }
-
-    private SSHPasswordAuthentication getSSHPasswordAuthentication() {
-        return new SSHPasswordAuthentication(userName, password);
-    }
-
-    private SSHKeyAuthentication getSSHKeyAuthentication() {
-        SSHKeyAuthentication sshKA = new SSHKeyAuthentication();
-        sshKA.setUserName(userName);
-        sshKA.setPassphrase(passPhrase);
-        sshKA.setPrivateKeyFilePath(privateKeyPath);
-        sshKA.setPublicKeyFilePath(publicKeyPath);
-        sshKA.setStrictHostKeyChecking("no");
-        return sshKA;
-    }
-
-    private String writeFileToDisk(byte[] data) {
-        File temp = null;
-        try {
-            temp = File.createTempFile("id_rsa", "");
-            //write it
-            FileOutputStream bw = new FileOutputStream(temp);
-            bw.write(data);
-            bw.close();
-        } catch (IOException e) {
-            log.error(e.getMessage(), e);
-        }
-        return temp.getAbsolutePath();
-    }
-
-    public URI getDestinationURI(TaskContext taskContext, String fileName) throws URISyntaxException {
-        String filePath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator) +
-                taskContext.getParentProcessContext().getProcessId() + File.separator + fileName;
-        return new URI("SCP", hostName, filePath, null);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
new file mode 100644
index 0000000..ab9d562
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
@@ -0,0 +1,126 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.Task;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+public class DataStageTask implements Task {
+	private static final Logger log = LoggerFactory.getLogger(DataStageTask.class);
+
+	@Override
+	public void init(Map<String, String> propertyMap) throws TaskException {
+
+	}
+
+	@Override
+	public TaskStatus execute(TaskContext taskContext) {
+		TaskStatus status = new TaskStatus(TaskState.COMPLETED);
+		if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) {
+			status.setState(TaskState.FAILED);
+			status.setReason("Invalid task call, expected " + TaskTypes.DATA_STAGING.toString() + " but found "
+					+ taskContext.getTaskModel().getTaskType().toString());
+		} else {
+			try {
+				DataStagingTaskModel subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel(taskContext
+						.getTaskModel());
+				URI sourceURI = new URI(subTaskModel.getSource());
+				URI destinationURI = new URI(subTaskModel.getDestination());
+
+				ProcessState processState = taskContext.getParentProcessContext().getProcessState();
+				if (processState == ProcessState.INPUT_DATA_STAGING) {
+					/**
+					 * copy local file to compute resource.
+					 */
+					taskContext.getParentProcessContext().getRemoteCluster().copyTo(sourceURI.getPath(), destinationURI
+							.getPath());
+				} else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+					/**
+					 * copy remote file from compute resource.
+					 */
+					taskContext.getParentProcessContext().getRemoteCluster().copyFrom(sourceURI.getPath(), destinationURI
+							.getPath());
+				}
+				status.setReason("Successfully staged data");
+			} catch (SSHApiException e) {
+				String msg = "Scp attempt failed";
+				log.error(msg, e);
+				status.setState(TaskState.FAILED);
+				status.setReason(msg);
+				ErrorModel errorModel = new ErrorModel();
+				errorModel.setActualErrorMessage(e.getMessage());
+				errorModel.setUserFriendlyMessage(msg);
+				taskContext.getTaskModel().setTaskError(errorModel);
+			} catch (TException e) {
+				String msg = "Invalid task invocation";
+				log.error(msg, e);
+				status.setState(TaskState.FAILED);
+				status.setReason(msg);
+				ErrorModel errorModel = new ErrorModel();
+				errorModel.setActualErrorMessage(e.getMessage());
+				errorModel.setUserFriendlyMessage(msg);
+				taskContext.getTaskModel().setTaskError(errorModel);
+			} catch (URISyntaxException e) {
+				String msg = "source or destination is not a valid URI";
+				log.error(msg, e);
+				status.setState(TaskState.FAILED);
+				status.setReason(msg);
+				ErrorModel errorModel = new ErrorModel();
+				errorModel.setActualErrorMessage(e.getMessage());
+				errorModel.setUserFriendlyMessage(msg);
+				taskContext.getTaskModel().setTaskError(errorModel);
+			}
+		}
+		return status;
+	}
+
+	@Override
+	public TaskStatus recover(TaskContext taskContext) {
+        TaskState state = taskContext.getTaskStatus().getState();
+        if (state == TaskState.EXECUTING || state == TaskState.CREATED) {
+            return execute(taskContext);
+        } else {
+            // files already transferred or failed
+            return taskContext.getTaskStatus();
+        }
+	}
+
+	@Override
+	public TaskTypes getType() {
+		return TaskTypes.DATA_STAGING;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
new file mode 100644
index 0000000..020880d
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
@@ -0,0 +1,286 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.gfac.core.*;
+import org.apache.airavata.gfac.core.cluster.JobSubmissionOutput;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.JobSubmissionTask;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+public class DefaultJobSubmissionTask implements JobSubmissionTask {
+    private static final Logger log = LoggerFactory.getLogger(DefaultJobSubmissionTask.class);
+    @Override
+    public void init(Map<String, String> propertyMap) throws TaskException {
+
+    }
+
+    @Override
+    public TaskStatus execute(TaskContext taskContext){
+	    TaskStatus taskStatus = new TaskStatus(TaskState.COMPLETED); // set to completed.
+	    try {
+		    ProcessContext processContext = taskContext.getParentProcessContext();
+		    JobModel jobModel = processContext.getJobModel();
+		    jobModel.setTaskId(taskContext.getTaskId());
+		    RemoteCluster remoteCluster = processContext.getRemoteCluster();
+		    JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext,taskContext);
+		    jobModel.setJobName(jobDescriptor.getJobName());
+		    ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
+		    JobManagerConfiguration jConfig = null;
+		    if (resourceJobManager != null) {
+			    jConfig = Factory.getJobManagerConfiguration(resourceJobManager);
+		    }
+		    JobStatus jobStatus = new JobStatus();
+		    File jobFile = GFacUtils.createJobFile(taskContext, jobDescriptor, jConfig);
+		    if (jobFile != null && jobFile.exists()) {
+			    jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
+			    JobSubmissionOutput jobSubmissionOutput = remoteCluster.submitBatchJob(jobFile.getPath(),
+					    processContext.getWorkingDir());
+			    jobModel.setExitCode(jobSubmissionOutput.getExitCode());
+			    jobModel.setStdErr(jobSubmissionOutput.getStdErr());
+			    jobModel.setStdOut(jobSubmissionOutput.getStdOut());
+			    String jobId = jobSubmissionOutput.getJobId();
+			    if (jobId != null && !jobId.isEmpty()) {
+				    jobModel.setJobId(jobId);
+				    GFacUtils.saveJobModel(processContext, jobModel);
+				    jobStatus.setJobState(JobState.SUBMITTED);
+				    jobStatus.setReason("Successfully Submitted to " + taskContext.getParentProcessContext()
+						    .getComputeResourceDescription().getHostName());
+                    jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+				    jobModel.setJobStatus(jobStatus);
+				    GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+				    if (verifyJobSubmissionByJobId(remoteCluster, jobId)) {
+					    jobStatus.setJobState(JobState.QUEUED);
+					    jobStatus.setReason("Verification step succeeded");
+                        jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+					    jobModel.setJobStatus(jobStatus);
+					    GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+				    }
+				    taskStatus = new TaskStatus(TaskState.COMPLETED);
+				    taskStatus.setReason("Submitted job to compute resource");
+                    taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+			    } else {
+				    int verificationTryCount = 0;
+				    while (verificationTryCount++ < 3) {
+					    String verifyJobId = verifyJobSubmission(remoteCluster, jobModel);
+					    if (verifyJobId != null && !verifyJobId.isEmpty()) {
+						    // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
+						    jobId = verifyJobId;
+						    jobModel.setJobId(jobId);
+						    GFacUtils.saveJobModel(processContext,jobModel);
+						    jobStatus.setJobState(JobState.QUEUED);
+						    jobStatus.setReason("Verification step succeeded");
+                            jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+						    jobModel.setJobStatus(jobStatus);
+						    GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+						    taskStatus.setState(TaskState.COMPLETED);
+						    taskStatus.setReason("Submitted job to compute resource");
+                            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+						    break;
+					    }
+					    log.info("Verify step return invalid jobId, retry verification step in {} secs", verificationTryCount * 10);
+					    Thread.sleep(verificationTryCount * 10000);
+				    }
+			    }
+
+			    if (jobId == null || jobId.isEmpty()) {
+				    String msg = "expId:" + processContext.getProcessModel().getExperimentId() + " Couldn't find " +
+						    "remote jobId for JobName:" + jobModel.getJobName() + ", both submit and verify steps " +
+						    "doesn't return a valid JobId. " + "Hence changing experiment state to Failed";
+				    log.error(msg);
+                    ErrorModel errorModel = new ErrorModel();
+                    errorModel.setUserFriendlyMessage(msg);
+                    errorModel.setActualErrorMessage(msg);
+				    GFacUtils.saveExperimentError(processContext, errorModel);
+                    GFacUtils.saveProcessError(processContext, errorModel);
+                    GFacUtils.saveTaskError(taskContext, errorModel);
+				    taskStatus.setState(TaskState.FAILED);
+				    taskStatus.setReason("Couldn't find job id in both submitted and verified steps");
+                    taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+			    }else {
+                    GFacUtils.saveJobModel(processContext, jobModel);
+                }
+		    } else {
+			    taskStatus.setState(TaskState.FAILED);
+			    if (jobFile == null) {
+				    taskStatus.setReason("JobFile is null");
+			    } else {
+				    taskStatus.setReason("Job file doesn't exist");
+			    }
+		    }
+
+	    } catch (AppCatalogException e) {
+		    String msg = "Error while instantiating app catalog";
+		    log.error(msg, e);
+		    taskStatus.setState(TaskState.FAILED);
+		    taskStatus.setReason(msg);
+            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+		    ErrorModel errorModel = new ErrorModel();
+		    errorModel.setActualErrorMessage(e.getMessage());
+		    errorModel.setUserFriendlyMessage(msg);
+		    taskContext.getTaskModel().setTaskError(errorModel);
+	    } catch (ApplicationSettingsException e) {
+		    String msg = "Error occurred while creating job descriptor";
+		    log.error(msg, e);
+		    taskStatus.setState(TaskState.FAILED);
+		    taskStatus.setReason(msg);
+            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+		    ErrorModel errorModel = new ErrorModel();
+		    errorModel.setActualErrorMessage(e.getMessage());
+		    errorModel.setUserFriendlyMessage(msg);
+		    taskContext.getTaskModel().setTaskError(errorModel);
+	    } catch (GFacException e) {
+		    String msg = "Error occurred while creating job descriptor";
+		    log.error(msg, e);
+		    taskStatus.setState(TaskState.FAILED);
+		    taskStatus.setReason(msg);
+            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+		    ErrorModel errorModel = new ErrorModel();
+		    errorModel.setActualErrorMessage(e.getMessage());
+		    errorModel.setUserFriendlyMessage(msg);
+		    taskContext.getTaskModel().setTaskError(errorModel);
+	    } catch (SSHApiException e) {
+		    String msg = "Error occurred while submitting the job";
+		    log.error(msg, e);
+		    taskStatus.setState(TaskState.FAILED);
+		    taskStatus.setReason(msg);
+            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+		    ErrorModel errorModel = new ErrorModel();
+		    errorModel.setActualErrorMessage(e.getMessage());
+		    errorModel.setUserFriendlyMessage(msg);
+		    taskContext.getTaskModel().setTaskError(errorModel);
+	    } catch (IOException e) {
+		    String msg = "Error while reading the content of the job file";
+		    log.error(msg, e);
+		    taskStatus.setState(TaskState.FAILED);
+		    taskStatus.setReason(msg);
+            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+		    ErrorModel errorModel = new ErrorModel();
+		    errorModel.setActualErrorMessage(e.getMessage());
+		    errorModel.setUserFriendlyMessage(msg);
+		    taskContext.getTaskModel().setTaskError(errorModel);
+	    } catch (InterruptedException e) {
+		    String msg = "Error occurred while verifying the job submission";
+		    log.error(msg, e);
+		    taskStatus.setState(TaskState.FAILED);
+		    taskStatus.setReason(msg);
+            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+		    ErrorModel errorModel = new ErrorModel();
+		    errorModel.setActualErrorMessage(e.getMessage());
+		    errorModel.setUserFriendlyMessage(msg);
+		    taskContext.getTaskModel().setTaskError(errorModel);
+	    }
+
+	    taskContext.setTaskStatus(taskStatus);
+	    try {
+		    GFacUtils.saveAndPublishTaskStatus(taskContext);
+	    } catch (GFacException e) {
+		    log.error("Error while saving task status", e);
+	    }
+	    return taskStatus;
+    }
+
+    private boolean verifyJobSubmissionByJobId(RemoteCluster remoteCluster, String jobID) throws SSHApiException {
+        JobStatus status = remoteCluster.getJobStatus(jobID);
+        return status != null &&  status.getJobState() != JobState.UNKNOWN;
+    }
+
+    private String verifyJobSubmission(RemoteCluster remoteCluster, JobModel jobDetails) {
+        String jobName = jobDetails.getJobName();
+        String jobId = null;
+        try {
+            jobId  = remoteCluster.getJobIdByJobName(jobName, remoteCluster.getServerInfo().getUserName());
+        } catch (SSHApiException e) {
+            log.error("Error while verifying JobId from JobName");
+        }
+        return jobId;
+    }
+
+
+    @Override
+    public TaskStatus recover(TaskContext taskContext) {
+            ProcessContext processContext = taskContext.getParentProcessContext();
+            JobModel jobModel = processContext.getJobModel();
+            // original job failed before submitting
+            if (jobModel == null || jobModel.getJobId() == null ){
+                return execute(taskContext);
+            }else {
+	            // job is already submitted and monitor should handle the recovery
+	            return new TaskStatus(TaskState.COMPLETED);
+            }
+    }
+
+	@Override
+	public TaskTypes getType() {
+		return TaskTypes.JOB_SUBMISSION;
+	}
+
+	@Override
+	public JobStatus cancel(TaskContext taskcontext) throws TaskException {
+		ProcessContext processContext = taskcontext.getParentProcessContext();
+		RemoteCluster remoteCluster = processContext.getRemoteCluster();
+		JobModel jobModel = processContext.getJobModel();
+		int retryCount = 0;
+		if (jobModel != null) {
+			try {
+				JobStatus oldJobStatus = remoteCluster.getJobStatus(jobModel.getJobId());
+				while (oldJobStatus == null && retryCount <= 5) {
+					retryCount++;
+					Thread.sleep(retryCount * 1000);
+					oldJobStatus = remoteCluster.getJobStatus(jobModel.getJobId());
+				}
+				if (oldJobStatus != null) {
+					oldJobStatus = remoteCluster.cancelJob(jobModel.getJobId());
+					return oldJobStatus;
+				} else {
+					throw new TaskException("Cancel operation failed, Job status couldn't find in resource, JobId " +
+							jobModel.getJobId());
+				}
+			} catch (SSHApiException | InterruptedException e) {
+				throw new TaskException("Error while cancelling job " + jobModel.getJobId(), e);
+			}
+		} else {
+			throw new TaskException("Couldn't complete cancel operation, JobModel is null in ProcessContext.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
new file mode 100644
index 0000000..fff130c
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.Task;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class EnvironmentSetupTask implements Task {
+
+	private static final Logger log = LoggerFactory.getLogger(EnvironmentSetupTask.class);
+	@Override
+	public void init(Map<String, String> propertyMap) throws TaskException {
+
+	}
+
+	@Override
+	public TaskStatus execute(TaskContext taskContext) {
+		TaskStatus status = new TaskStatus(TaskState.COMPLETED);
+		try {
+			RemoteCluster remoteCluster = taskContext.getParentProcessContext().getRemoteCluster();
+			remoteCluster.makeDirectory(taskContext.getParentProcessContext().getWorkingDir());
+			status.setReason("Successfully created environment");
+		} catch (SSHApiException e) {
+			String msg = "Error while environment setup";
+			log.error(msg, e);
+			status.setState(TaskState.FAILED);
+			status.setReason(msg);
+			ErrorModel errorModel = new ErrorModel();
+			errorModel.setActualErrorMessage(e.getMessage());
+			errorModel.setUserFriendlyMessage(msg);
+			taskContext.getTaskModel().setTaskError(errorModel);
+		}
+		return status;
+	}
+
+	@Override
+	public TaskStatus recover(TaskContext taskContext) {
+		return execute(taskContext);
+	}
+
+	@Override
+	public TaskTypes getType() {
+		return TaskTypes.ENV_SETUP;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
new file mode 100644
index 0000000..ed75fef
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
@@ -0,0 +1,184 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.gfac.core.*;
+import org.apache.airavata.gfac.core.cluster.JobSubmissionOutput;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.JobSubmissionTask;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+public class ForkJobSubmissionTask implements JobSubmissionTask {
+    private static final Logger log = LoggerFactory.getLogger(ForkJobSubmissionTask.class);
+    @Override
+    public void init(Map<String, String> propertyMap) throws TaskException {
+
+    }
+
+    @Override
+    public TaskStatus execute(TaskContext taskContext) {
+        TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
+        try {
+            ProcessContext processContext = taskContext.getParentProcessContext();
+            JobModel jobModel = processContext.getJobModel();
+            jobModel.setTaskId(taskContext.getTaskId());
+            RemoteCluster remoteCluster = processContext.getRemoteCluster();
+            JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext, taskContext);
+            jobModel.setJobName(jobDescriptor.getJobName());
+            ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
+            JobManagerConfiguration jConfig = null;
+            if (resourceJobManager != null) {
+                jConfig = Factory.getJobManagerConfiguration(resourceJobManager);
+            }
+            JobStatus jobStatus = new JobStatus();
+	        File jobFile = GFacUtils.createJobFile(taskContext, jobDescriptor, jConfig);
+	        if (jobFile != null && jobFile.exists()) {
+                jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
+	            JobSubmissionOutput jobSubmissionOutput = remoteCluster.submitBatchJob(jobFile.getPath(),
+			            processContext.getWorkingDir());
+	            jobModel.setExitCode(jobSubmissionOutput.getExitCode());
+	            jobModel.setStdErr(jobSubmissionOutput.getStdErr());
+	            jobModel.setStdOut(jobSubmissionOutput.getStdOut());
+	            String jobId = jobSubmissionOutput.getJobId();
+	            if (jobId != null && !jobId.isEmpty()) {
+                    jobModel.setJobId(jobId);
+                    GFacUtils.saveJobModel(processContext, jobModel);
+                    jobStatus.setJobState(JobState.SUBMITTED);
+                    jobStatus.setReason("Successfully Submitted to " + taskContext.getParentProcessContext()
+                            .getComputeResourceDescription().getHostName());
+                    jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                    jobModel.setJobStatus(jobStatus);
+                    GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+                    taskStatus = new TaskStatus(TaskState.COMPLETED);
+                    taskStatus.setReason("Submitted job to compute resource");
+                }
+                if (jobId == null || jobId.isEmpty()) {
+                    String msg = "expId:" + processContext.getProcessModel().getExperimentId() + " Couldn't find " +
+                            "remote jobId for JobName:" + jobModel.getJobName() + ", both submit and verify steps " +
+                            "doesn't return a valid JobId. " + "Hence changing experiment state to Failed";
+                    log.error(msg);
+                    ErrorModel errorModel = new ErrorModel();
+                    errorModel.setActualErrorMessage(msg);
+                    errorModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+                    GFacUtils.saveExperimentError(processContext, errorModel);
+                    GFacUtils.saveProcessError(processContext, errorModel);
+                    GFacUtils.saveTaskError(taskContext, errorModel);
+                    taskStatus.setState(TaskState.FAILED);
+                    taskStatus.setReason("Couldn't find job id in both submitted and verified steps");
+                }else {
+                    GFacUtils.saveJobModel(processContext, jobModel);
+                }
+            } else {
+                taskStatus.setState(TaskState.FAILED);
+                if (jobFile == null) {
+                    taskStatus.setReason("JobFile is null");
+                } else {
+                    taskStatus.setReason("Job file doesn't exist");
+                }
+            }
+        } catch (ApplicationSettingsException e) {
+            String msg = "Error occurred while creating job descriptor";
+            log.error(msg, e);
+            taskStatus.setState(TaskState.FAILED);
+            taskStatus.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+        } catch (AppCatalogException e) {
+            String msg = "Error while instantiating app catalog";
+            log.error(msg, e);
+            taskStatus.setState(TaskState.FAILED);
+            taskStatus.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+        } catch (GFacException e) {
+            String msg = "Error occurred while creating job descriptor";
+            log.error(msg, e);
+            taskStatus.setState(TaskState.FAILED);
+            taskStatus.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+        } catch (SSHApiException e) {
+            String msg = "Error occurred while submitting the job";
+            log.error(msg, e);
+            taskStatus.setState(TaskState.FAILED);
+            taskStatus.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+        } catch (IOException e) {
+            String msg = "Error while reading the content of the job file";
+            log.error(msg, e);
+            taskStatus.setState(TaskState.FAILED);
+            taskStatus.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+        }
+        return taskStatus;
+    }
+
+    @Override
+    public TaskStatus recover(TaskContext taskContext) {
+        //TODO implement recovery scenario instead of calling execute.
+        return execute(taskContext);
+    }
+
+	@Override
+	public TaskTypes getType() {
+		return TaskTypes.JOB_SUBMISSION;
+	}
+
+	@Override
+	public JobStatus cancel(TaskContext taskcontext) {
+		// TODO - implement cancel with SSH Fork
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
index 32ee31b..678ded1 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
@@ -20,11 +20,32 @@
  */
 package org.apache.airavata.gfac.impl.task;
 
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.credential.store.credential.Credential;
+import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.credential.store.store.CredentialStoreException;
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication;
+import org.apache.airavata.gfac.core.authentication.SSHPasswordAuthentication;
+import org.apache.airavata.gfac.core.cluster.CommandInfo;
+import org.apache.airavata.gfac.core.cluster.RawCommandInfo;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.cluster.ServerInfo;
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.Task;
 import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.gfac.impl.SSHUtils;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.status.TaskState;
@@ -35,81 +56,240 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Map;
 
+/**
+ * This will be used for both Input file staging and output file staging, hence if you do any changes to a part of logic
+ * in this class please consider that will works with both input and output cases.
+ */
 public class SCPDataStageTask implements Task {
-	private static final Logger log = LoggerFactory.getLogger(SCPDataStageTask.class);
-
-	@Override
-	public void init(Map<String, String> propertyMap) throws TaskException {
-
-	}
-
-	@Override
-	public TaskStatus execute(TaskContext taskContext) {
-		TaskStatus status = new TaskStatus(TaskState.COMPLETED);
-		if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) {
-			status.setState(TaskState.FAILED);
-			status.setReason("Invalid task call, expected " + TaskTypes.DATA_STAGING.toString() + " but found "
-					+ taskContext.getTaskModel().getTaskType().toString());
-		} else {
-			try {
-				DataStagingTaskModel subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel(taskContext
-						.getTaskModel());
-				URI sourceURI = new URI(subTaskModel.getSource());
-				URI destinationURI = new URI(subTaskModel.getDestination());
-
-				ProcessState processState = taskContext.getParentProcessContext().getProcessState();
-				if (processState == ProcessState.INPUT_DATA_STAGING) {
-					/**
-					 * copy local file to compute resource.
-					 */
-					taskContext.getParentProcessContext().getRemoteCluster().copyTo(sourceURI.getPath(), destinationURI
-							.getPath());
-				} else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
-					/**
-					 * copy remote file from compute resource.
-					 */
-					taskContext.getParentProcessContext().getRemoteCluster().copyFrom(sourceURI.getPath(), destinationURI
-							.getPath());
-				}
-				status.setReason("Successfully staged data");
-			} catch (SSHApiException e) {
-				String msg = "Scp attempt failed";
-				log.error(msg, e);
-				status.setState(TaskState.FAILED);
-				status.setReason(msg);
-				ErrorModel errorModel = new ErrorModel();
-				errorModel.setActualErrorMessage(e.getMessage());
-				errorModel.setUserFriendlyMessage(msg);
-				taskContext.getTaskModel().setTaskError(errorModel);
-			} catch (TException e) {
-				String msg = "Invalid task invocation";
-				log.error(msg, e);
-				status.setState(TaskState.FAILED);
-				status.setReason(msg);
-				ErrorModel errorModel = new ErrorModel();
-				errorModel.setActualErrorMessage(e.getMessage());
-				errorModel.setUserFriendlyMessage(msg);
-				taskContext.getTaskModel().setTaskError(errorModel);
-			} catch (URISyntaxException e) {
-				String msg = "source or destination is not a valid URI";
-				log.error(msg, e);
-				status.setState(TaskState.FAILED);
-				status.setReason(msg);
-				ErrorModel errorModel = new ErrorModel();
-				errorModel.setActualErrorMessage(e.getMessage());
-				errorModel.setUserFriendlyMessage(msg);
-				taskContext.getTaskModel().setTaskError(errorModel);
-			}
-		}
-		return status;
-	}
-
-	@Override
-	public TaskStatus recover(TaskContext taskContext) {
+    private static final Logger log = LoggerFactory.getLogger(SCPDataStageTask.class);
+    private static final int DEFAULT_SSH_PORT = 22;
+    private String password;
+    private String publicKeyPath;
+    private String passPhrase;
+    private String privateKeyPath;
+    private String userName;
+    private String hostName;
+    private String inputPath;
+
+    @Override
+    public void init(Map<String, String> propertyMap) throws TaskException {
+        inputPath = propertyMap.get("inputPath");
+        hostName = propertyMap.get("hostName");
+        userName = propertyMap.get("userName");
+    }
+
+    @Override
+    public TaskStatus execute(TaskContext taskContext) {
+        TaskStatus status = new TaskStatus(TaskState.EXECUTING);
+        AuthenticationInfo authenticationInfo = null;
+        DataStagingTaskModel subTaskModel = null;
+        String localDataDir = null;
+        ProcessState processState = taskContext.getParentProcessContext().getProcessState();
+        try {
+            subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel
+                    (taskContext.getTaskModel());
+            if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+                OutputDataObjectType processOutput = taskContext.getProcessOutput();
+                if (processOutput != null && processOutput.getValue() == null) {
+                    log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
+                            taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+                            processOutput.getName());
+                    status = new TaskStatus(TaskState.FAILED);
+                    if (processOutput.isIsRequired()) {
+                        status.setReason("File name is null, but this output's isRequired bit is not set");
+                    } else {
+                        status.setReason("File name is null");
+                    }
+                    return status;
+                }
+            } else if (processState == ProcessState.INPUT_DATA_STAGING) {
+                InputDataObjectType processInput = taskContext.getProcessInput();
+                if (processInput != null && processInput.getValue() == null) {
+                    log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
+                            taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+                            processInput.getName());
+                    status = new TaskStatus(TaskState.FAILED);
+                    if (processInput.isIsRequired()) {
+                        status.setReason("File name is null, but this input's isRequired bit is not set");
+                    } else {
+                        status.setReason("File name is null");
+                    }
+                    return status;
+                }
+            } else {
+                status.setState(TaskState.FAILED);
+                status.setReason("Invalid task invocation, Support " + ProcessState.INPUT_DATA_STAGING.name() + " and " +
+                        "" + ProcessState.OUTPUT_DATA_STAGING.name() + " process phases. found " + processState.name());
+                return status;
+            }
+
+            // use rsync instead of scp if source and destination host and user name is same.
+            URI sourceURI = new URI(subTaskModel.getSource());
+            String fileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1,
+                    sourceURI.getPath().length());
+            URI destinationURI = null;
+            if (subTaskModel.getDestination().startsWith("dummy")) {
+                destinationURI = getDestinationURI(taskContext, fileName);
+                subTaskModel.setDestination(destinationURI.toString());
+            } else {
+                destinationURI = new URI(subTaskModel.getDestination());
+            }
+
+            if (sourceURI.getHost().equalsIgnoreCase(destinationURI.getHost())
+                    && sourceURI.getUserInfo().equalsIgnoreCase(destinationURI.getUserInfo())) {
+                localDataCopy(taskContext, sourceURI, destinationURI);
+                status.setState(TaskState.COMPLETED);
+                status.setReason("Locally copied file using 'cp' command ");
+                return status;
+            }
+
+
+            String tokenId = taskContext.getParentProcessContext().getTokenId();
+            CredentialReader credentialReader = GFacUtils.getCredentialReader();
+            Credential credential = credentialReader.getCredential(taskContext.getParentProcessContext().getGatewayId(), tokenId);
+            if (credential instanceof SSHCredential) {
+                SSHCredential sshCredential = (SSHCredential) credential;
+                byte[] publicKey = sshCredential.getPublicKey();
+                publicKeyPath = writeFileToDisk(publicKey);
+                byte[] privateKey = sshCredential.getPrivateKey();
+                privateKeyPath = writeFileToDisk(privateKey);
+                passPhrase = sshCredential.getPassphrase();
+//                userName = sshCredential.getPortalUserName(); // this might not same as login user name
+                authenticationInfo = getSSHKeyAuthentication();
+            } else {
+                String msg = "Provided credential store token is not valid. Please provide the correct credential store token";
+                log.error(msg);
+                status.setState(TaskState.FAILED);
+                status.setReason(msg);
+                ErrorModel errorModel = new ErrorModel();
+                errorModel.setActualErrorMessage(msg);
+                errorModel.setUserFriendlyMessage(msg);
+                taskContext.getTaskModel().setTaskError(errorModel);
+                return status;
+            }
+            status = new TaskStatus(TaskState.COMPLETED);
+
+            ServerInfo serverInfo = new ServerInfo(userName, hostName, DEFAULT_SSH_PORT);
+            Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
+            if (processState == ProcessState.INPUT_DATA_STAGING) {
+                inputDataStaging(taskContext, sshSession, sourceURI, destinationURI);
+                status.setReason("Successfully staged input data");
+            } else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+                String targetPath = destinationURI.getPath().substring(0, destinationURI.getPath().lastIndexOf('/'));
+                SSHUtils.makeDirectory(targetPath, sshSession);
+                // TODO - save updated subtask model with new destination
+                outputDataStaging(taskContext, sshSession, sourceURI, destinationURI);
+                status.setReason("Successfully staged output data");
+            }
+        } catch (TException e) {
+            String msg = "Couldn't create subTask model thrift model";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+            return status;
+        } catch (ApplicationSettingsException | FileNotFoundException | CredentialStoreException | IllegalAccessException | InstantiationException e) {
+            String msg = "Failed while reading credentials";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+        } catch (URISyntaxException e) {
+            String msg = "Sorce or destination uri is not correct source : " + subTaskModel.getSource() + ", " +
+                    "destination : " + subTaskModel.getDestination();
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+        } catch (SSHApiException e) {
+            String msg = "Failed to do scp with compute resource";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+        } catch (AiravataException e) {
+            String msg = "Error while creating ssh session with client";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+        } catch (JSchException | IOException e) {
+            String msg = "Failed to do scp with client";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+        } catch (GFacException e) {
+            String msg = "Failed update experiment and process inputs and outputs";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+        }
+        return status;
+    }
+
+    private void localDataCopy(TaskContext taskContext, URI sourceURI, URI destinationURI) throws SSHApiException {
+        StringBuilder sb = new StringBuilder("rsync -cr ");
+        sb.append(sourceURI.getPath()).append(" ").append(destinationURI.getPath());
+        CommandInfo commandInfo = new RawCommandInfo(sb.toString());
+        taskContext.getParentProcessContext().getRemoteCluster().execute(commandInfo);
+    }
+
+    private void inputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI
+            destinationURI) throws SSHApiException, IOException, JSchException {
+        /**
+         * scp third party file transfer 'to' compute resource.
+         */
+        taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(),
+                destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.TO);
+    }
+
+    private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI destinationURI)
+            throws SSHApiException, AiravataException, IOException, JSchException, GFacException {
+
+        /**
+         * scp third party file transfer 'from' comute resource.
+         */
+        taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(),
+                destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.FROM);
+        // update output locations
+        GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath());
+        GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath());
+
+    }
+
+    @Override
+    public TaskStatus recover(TaskContext taskContext) {
         TaskState state = taskContext.getTaskStatus().getState();
         if (state == TaskState.EXECUTING || state == TaskState.CREATED) {
             return execute(taskContext);
@@ -117,10 +297,45 @@ public class SCPDataStageTask implements Task {
             // files already transferred or failed
             return taskContext.getTaskStatus();
         }
-	}
+    }
+
+    @Override
+    public TaskTypes getType() {
+        return TaskTypes.DATA_STAGING;
+    }
+
+    private SSHPasswordAuthentication getSSHPasswordAuthentication() {
+        return new SSHPasswordAuthentication(userName, password);
+    }
+
+    private SSHKeyAuthentication getSSHKeyAuthentication() {
+        SSHKeyAuthentication sshKA = new SSHKeyAuthentication();
+        sshKA.setUserName(userName);
+        sshKA.setPassphrase(passPhrase);
+        sshKA.setPrivateKeyFilePath(privateKeyPath);
+        sshKA.setPublicKeyFilePath(publicKeyPath);
+        sshKA.setStrictHostKeyChecking("no");
+        return sshKA;
+    }
+
+    private String writeFileToDisk(byte[] data) {
+        File temp = null;
+        try {
+            temp = File.createTempFile("id_rsa", "");
+            //write it
+            FileOutputStream bw = new FileOutputStream(temp);
+            bw.write(data);
+            bw.close();
+        } catch (IOException e) {
+            log.error(e.getMessage(), e);
+        }
+        return temp.getAbsolutePath();
+    }
+
+    public URI getDestinationURI(TaskContext taskContext, String fileName) throws URISyntaxException {
+        String filePath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator) +
+                taskContext.getParentProcessContext().getProcessId() + File.separator + fileName;
+        return new URI("SCP", hostName, filePath, null);
 
-	@Override
-	public TaskTypes getType() {
-		return TaskTypes.DATA_STAGING;
-	}
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
deleted file mode 100644
index d28ae3f..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.airavata.gfac.impl.task;
-
-import org.apache.airavata.gfac.core.SSHApiException;
-import org.apache.airavata.gfac.core.cluster.RemoteCluster;
-import org.apache.airavata.gfac.core.context.TaskContext;
-import org.apache.airavata.gfac.core.task.Task;
-import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.model.commons.ErrorModel;
-import org.apache.airavata.model.status.TaskState;
-import org.apache.airavata.model.status.TaskStatus;
-import org.apache.airavata.model.task.TaskTypes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-public class SSHEnvironmentSetupTask implements Task {
-
-	private static final Logger log = LoggerFactory.getLogger(SSHEnvironmentSetupTask.class);
-	@Override
-	public void init(Map<String, String> propertyMap) throws TaskException {
-
-	}
-
-	@Override
-	public TaskStatus execute(TaskContext taskContext) {
-		TaskStatus status = new TaskStatus(TaskState.COMPLETED);
-		try {
-			RemoteCluster remoteCluster = taskContext.getParentProcessContext().getRemoteCluster();
-			remoteCluster.makeDirectory(taskContext.getParentProcessContext().getWorkingDir());
-			status.setReason("Successfully created environment");
-		} catch (SSHApiException e) {
-			String msg = "Error while environment setup";
-			log.error(msg, e);
-			status.setState(TaskState.FAILED);
-			status.setReason(msg);
-			ErrorModel errorModel = new ErrorModel();
-			errorModel.setActualErrorMessage(e.getMessage());
-			errorModel.setUserFriendlyMessage(msg);
-			taskContext.getTaskModel().setTaskError(errorModel);
-		}
-		return status;
-	}
-
-	@Override
-	public TaskStatus recover(TaskContext taskContext) {
-		return execute(taskContext);
-	}
-
-	@Override
-	public TaskTypes getType() {
-		return TaskTypes.ENV_SETUP;
-	}
-}