You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ad...@apache.org on 2017/04/13 18:47:54 UTC

[6/6] airavata git commit: Adding Data staging task implementation and related utils

Adding Data staging task implementation and related utils


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/ea865f23
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/ea865f23
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/ea865f23

Branch: refs/heads/feature-workload-mgmt
Commit: ea865f23709778addf03480b10c5a9e80b2294f3
Parents: fbc0351
Author: Ajinkya Dhamnaskar <ad...@apache.org>
Authored: Thu Apr 13 14:38:31 2017 -0400
Committer: Ajinkya Dhamnaskar <ad...@apache.org>
Committed: Thu Apr 13 14:38:31 2017 -0400

----------------------------------------------------------------------
 modules/worker/task-datastaging/pom.xml         |  72 +++++
 .../task/datastaging/handler/DataStageTask.java | 125 ++++++++
 .../datastaging/handler/SCPDataStageTask.java   | 292 +++++++++++++++++++
 .../datastaging/utils/DataStagingFactory.java   |  30 ++
 .../datastaging/utils/DataStagingUtils.java     | 107 +++++++
 5 files changed, 626 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/ea865f23/modules/worker/task-datastaging/pom.xml
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/pom.xml b/modules/worker/task-datastaging/pom.xml
new file mode 100644
index 0000000..c046f45
--- /dev/null
+++ b/modules/worker/task-datastaging/pom.xml
@@ -0,0 +1,72 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.airavata</groupId>
+        <artifactId>airavata-worker</artifactId>
+        <version>0.17-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>airavata-data-staging</artifactId>
+    <name>Airavata Task - Data Staging</name>
+    <description>I/O Data staging task implementation.</description>
+    <url>http://airavata.apache.org/</url>
+
+    <properties>
+        <jcraft.version>0.1.53</jcraft.version>
+        <net.schmizz.version>0.6.1</net.schmizz.version>
+    </properties>
+
+    <dependencies>
+        <!-- Logging -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.jcraft</groupId>
+            <artifactId>jsch</artifactId>
+            <version>${jcraft.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>net.schmizz</groupId>
+            <artifactId>sshj</artifactId>
+            <version>${net.schmizz.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-data-models</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-worker-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.ethz.ganymed</groupId>
+            <artifactId>ganymed-ssh2</artifactId>
+            <version>262</version>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.groovy</groupId>
+            <artifactId>groovy</artifactId>
+            <version>${groovy.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.groovy</groupId>
+            <artifactId>groovy-templates</artifactId>
+            <version>${groovy.version}</version>
+        </dependency>
+        <!-- Credential Store -->
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-credential-store</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/ea865f23/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/DataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/DataStageTask.java b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/DataStageTask.java
new file mode 100644
index 0000000..aab6a735
--- /dev/null
+++ b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/DataStageTask.java
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.worker.task.datastaging.handler;
+
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.worker.core.context.TaskContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.apache.airavata.worker.core.task.Task;
+import org.apache.airavata.worker.core.task.TaskException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Map;
+
+public class DataStageTask implements Task {
+	private static final Logger log = LoggerFactory.getLogger(DataStageTask.class);
+
+	@Override
+	public void init(Map<String, String> propertyMap) throws TaskException {
+
+	}
+
+	@Override
+	public TaskStatus execute(TaskContext taskContext) {
+		TaskStatus status = new TaskStatus(TaskState.COMPLETED);
+		if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) {
+			status.setState(TaskState.FAILED);
+			status.setReason("Invalid task call, expected " + TaskTypes.DATA_STAGING.toString() + " but found "
+					+ taskContext.getTaskModel().getTaskType().toString());
+		} else {
+			try {
+				DataStagingTaskModel subTaskModel = ((DataStagingTaskModel) taskContext.getSubTaskModel());
+				URI sourceURI = new URI(subTaskModel.getSource());
+				URI destinationURI = new URI(subTaskModel.getDestination());
+
+				ProcessState processState = taskContext.getParentProcessContext().getProcessState();
+				if (processState == ProcessState.INPUT_DATA_STAGING) {
+					/**
+					 * copy local file to compute resource.
+					 */
+					taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyTo(sourceURI.getPath(), destinationURI
+							.getPath());
+				} else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+					/**
+					 * copy remote file from compute resource.
+					 */
+					taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyFrom(sourceURI.getPath(), destinationURI
+							.getPath());
+				}
+				status.setReason("Successfully staged data");
+			} catch (WorkerException e) {
+				String msg = "Scp attempt failed";
+				log.error(msg, e);
+				status.setState(TaskState.FAILED);
+				status.setReason(msg);
+				ErrorModel errorModel = new ErrorModel();
+				errorModel.setActualErrorMessage(e.getMessage());
+				errorModel.setUserFriendlyMessage(msg);
+				taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+			} catch (TException e) {
+				String msg = "Invalid task invocation";
+				log.error(msg, e);
+				status.setState(TaskState.FAILED);
+				status.setReason(msg);
+				ErrorModel errorModel = new ErrorModel();
+				errorModel.setActualErrorMessage(e.getMessage());
+				errorModel.setUserFriendlyMessage(msg);
+				taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+			} catch (URISyntaxException e) {
+				String msg = "source or destination is not a valid URI";
+				log.error(msg, e);
+				status.setState(TaskState.FAILED);
+				status.setReason(msg);
+				ErrorModel errorModel = new ErrorModel();
+				errorModel.setActualErrorMessage(e.getMessage());
+				errorModel.setUserFriendlyMessage(msg);
+				taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+			}
+		}
+		return status;
+	}
+
+	@Override
+	public TaskStatus recover(TaskContext taskContext) {
+        TaskState state = taskContext.getTaskStatus().getState();
+        if (state == TaskState.EXECUTING || state == TaskState.CREATED) {
+            return execute(taskContext);
+        } else {
+            // files already transferred or failed
+            return taskContext.getTaskStatus();
+        }
+	}
+
+	@Override
+	public TaskTypes getType() {
+		return TaskTypes.DATA_STAGING;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/ea865f23/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/SCPDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/SCPDataStageTask.java b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/SCPDataStageTask.java
new file mode 100644
index 0000000..bd13b1e
--- /dev/null
+++ b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/SCPDataStageTask.java
@@ -0,0 +1,292 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.worker.task.datastaging.handler;
+
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.credential.store.store.CredentialStoreException;
+import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.worker.core.authentication.AuthenticationInfo;
+import org.apache.airavata.worker.core.cluster.CommandInfo;
+import org.apache.airavata.worker.core.cluster.RawCommandInfo;
+import org.apache.airavata.worker.core.cluster.RemoteCluster;
+import org.apache.airavata.worker.core.cluster.ServerInfo;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.apache.airavata.worker.core.context.TaskContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.apache.airavata.worker.core.task.Task;
+import org.apache.airavata.worker.core.task.TaskException;
+import org.apache.airavata.worker.core.utils.WorkerFactory;
+import org.apache.airavata.worker.task.datastaging.utils.DataStagingFactory;
+import org.apache.airavata.worker.task.datastaging.utils.DataStagingUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * This will be used for both Input file staging and output file staging, hence if you do any changes to a part of logic
+ * in this class please consider that will works with both input and output cases.
+ */
+public class SCPDataStageTask implements Task {
+    private static final Logger log = LoggerFactory.getLogger(SCPDataStageTask.class);
+    private static final int DEFAULT_SSH_PORT = 22;
+    private String hostName;
+    private String inputPath;
+
+    @Override
+    public void init(Map<String, String> propertyMap) throws TaskException {
+
+    }
+
+    @Override
+    public TaskStatus execute(TaskContext taskContext) {
+        TaskStatus status = new TaskStatus(TaskState.EXECUTING);
+        AuthenticationInfo authenticationInfo = null;
+        DataStagingTaskModel subTaskModel = null;
+        String localDataDir = null;
+
+        ProcessContext processContext = taskContext.getParentProcessContext();
+        ProcessState processState = processContext.getProcessState();
+        try {
+            subTaskModel = ((DataStagingTaskModel) taskContext.getSubTaskModel());
+            if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+                OutputDataObjectType processOutput = taskContext.getProcessOutput();
+                if (processOutput != null && processOutput.getValue() == null) {
+                    log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
+                            taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+                            processOutput.getName());
+                    status = new TaskStatus(TaskState.FAILED);
+                    if (processOutput.isIsRequired()) {
+                        status.setReason("File name is null, but this output's isRequired bit is not set");
+                    } else {
+                        status.setReason("File name is null");
+                    }
+                    return status;
+                }
+            } else if (processState == ProcessState.INPUT_DATA_STAGING) {
+                InputDataObjectType processInput = taskContext.getProcessInput();
+                if (processInput != null && processInput.getValue() == null) {
+                    log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
+                            taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+                            processInput.getName());
+                    status = new TaskStatus(TaskState.FAILED);
+                    if (processInput.isIsRequired()) {
+                        status.setReason("File name is null, but this input's isRequired bit is not set");
+                    } else {
+                        status.setReason("File name is null");
+                    }
+                    return status;
+                }
+            } else {
+                status.setState(TaskState.FAILED);
+                status.setReason("Invalid task invocation, Support " + ProcessState.INPUT_DATA_STAGING.name() + " and " +
+                        "" + ProcessState.OUTPUT_DATA_STAGING.name() + " process phases. found " + processState.name());
+                return status;
+            }
+
+            StorageResourceDescription storageResource = processContext.getStorageResource();
+//            StoragePreference storagePreference = taskContext.getParentProcessContext().getStoragePreference();
+
+            if (storageResource != null) {
+                hostName = storageResource.getHostName();
+            } else {
+                throw new WorkerException("Storage Resource is null");
+            }
+            inputPath  = processContext.getStorageFileSystemRootLocation();
+            inputPath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator);
+
+            // use rsync instead of scp if source and destination host and user name is same.
+            URI sourceURI = new URI(subTaskModel.getSource());
+            String fileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1,
+                    sourceURI.getPath().length());
+            URI destinationURI = null;
+            if (subTaskModel.getDestination().startsWith("dummy")) {
+                destinationURI = DataStagingUtils.getDestinationURI(taskContext, hostName, inputPath, fileName);
+                subTaskModel.setDestination(destinationURI.toString());
+            } else {
+                destinationURI = new URI(subTaskModel.getDestination());
+            }
+
+            if (sourceURI.getHost().equalsIgnoreCase(destinationURI.getHost())
+                    && sourceURI.getUserInfo().equalsIgnoreCase(destinationURI.getUserInfo())) {
+                localDataCopy(taskContext, sourceURI, destinationURI);
+                status.setState(TaskState.COMPLETED);
+                status.setReason("Locally copied file using 'cp' command ");
+                return status;
+            }
+
+            authenticationInfo = DataStagingFactory.getComputerResourceSSHKeyAuthentication(processContext);
+            status = new TaskStatus(TaskState.COMPLETED);
+
+            ServerInfo serverInfo = processContext.getComputeResourceServerInfo();
+            Session sshSession = WorkerFactory.getSSHSession(authenticationInfo, serverInfo);
+            if (processState == ProcessState.INPUT_DATA_STAGING) {
+                inputDataStaging(taskContext, sshSession, sourceURI, destinationURI);
+                status.setReason("Successfully staged input data");
+            } else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+                makeDir(taskContext, destinationURI);
+                // TODO - save updated subtask model with new destination
+                outputDataStaging(taskContext, sshSession, sourceURI, destinationURI);
+                status.setReason("Successfully staged output data");
+            }
+        } catch (TException e) {
+            String msg = "Couldn't create subTask model thrift model";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+            return status;
+        } catch (ApplicationSettingsException | FileNotFoundException e) {
+            String msg = "Failed while reading credentials";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+        } catch (URISyntaxException e) {
+            String msg = "Source or destination uri is not correct source : " + subTaskModel.getSource() + ", " +
+                    "destination : " + subTaskModel.getDestination();
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+        } catch (CredentialStoreException e) {
+            String msg = "Storage authentication issue, could be invalid credential token";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+        } catch (AiravataException e) {
+            String msg = "Error while creating ssh session with client";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+        } catch (JSchException | IOException e) {
+            String msg = "Failed to do scp with client";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+        } catch (WorkerException e) {
+            String msg = "Data staging failed";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+        }
+        return status;
+    }
+
+    private void localDataCopy(TaskContext taskContext, URI sourceURI, URI destinationURI) throws WorkerException {
+        StringBuilder sb = new StringBuilder("rsync -cr ");
+        sb.append(sourceURI.getPath()).append(" ").append(destinationURI.getPath());
+        CommandInfo commandInfo = new RawCommandInfo(sb.toString());
+        taskContext.getParentProcessContext().getDataMovementRemoteCluster().execute(commandInfo);
+    }
+
+    private void inputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI
+            destinationURI) throws WorkerException, IOException, JSchException {
+        /**
+         * scp third party file transfer 'to' compute resource.
+         */
+        taskContext.getParentProcessContext().getDataMovementRemoteCluster().scpThirdParty(sourceURI.getPath(),
+                destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.FROM, false);
+    }
+
+    private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI destinationURI)
+            throws AiravataException, IOException, JSchException, WorkerException {
+
+        /**
+         * scp third party file transfer 'from' comute resource.
+         */
+        taskContext.getParentProcessContext().getDataMovementRemoteCluster().scpThirdParty(sourceURI.getPath(),
+                destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.TO, true);
+        // update output locations
+        DataStagingUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.toString());
+        DataStagingUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.toString());
+
+    }
+
+    private void makeDir(TaskContext taskContext, URI pathURI) throws WorkerException {
+        int endIndex = pathURI.getPath().lastIndexOf('/');
+        if (endIndex < 1) {
+            return;
+        }
+        String targetPath = pathURI.getPath().substring(0, endIndex);
+        taskContext.getParentProcessContext().getDataMovementRemoteCluster().makeDirectory(targetPath);
+    }
+
+    @Override
+    public TaskStatus recover(TaskContext taskContext) {
+        TaskState state = taskContext.getTaskStatus().getState();
+        if (state == TaskState.EXECUTING || state == TaskState.CREATED) {
+            return execute(taskContext);
+        } else {
+            // files already transferred or failed
+            return taskContext.getTaskStatus();
+        }
+    }
+
+    @Override
+    public TaskTypes getType() {
+        return TaskTypes.DATA_STAGING;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/ea865f23/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingFactory.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingFactory.java b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingFactory.java
new file mode 100644
index 0000000..51996b8
--- /dev/null
+++ b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingFactory.java
@@ -0,0 +1,30 @@
+package org.apache.airavata.worker.task.datastaging.utils;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.credential.store.store.CredentialStoreException;
+import org.apache.airavata.worker.core.authentication.SSHKeyAuthentication;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.apache.airavata.worker.core.utils.WorkerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by Ajinkya on 4/13/17.
+ */
+public class DataStagingFactory {
+
+    private static final Logger log = LoggerFactory.getLogger(DataStagingFactory.class);
+
+    public static SSHKeyAuthentication getComputerResourceSSHKeyAuthentication(ProcessContext pc)
+            throws WorkerException, CredentialStoreException {
+        try {
+            return WorkerFactory.getSshKeyAuthentication(pc.getGatewayId(),
+                    pc.getComputeResourceLoginUserName(),
+                    pc.getComputeResourceCredentialToken());
+        } catch (ApplicationSettingsException | IllegalAccessException | InstantiationException e) {
+            throw new WorkerException("Couldn't build ssh authentication object", e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/ea865f23/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingUtils.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingUtils.java b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingUtils.java
new file mode 100644
index 0000000..d3d7522
--- /dev/null
+++ b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingUtils.java
@@ -0,0 +1,107 @@
+package org.apache.airavata.worker.task.datastaging.utils;
+
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.data.replica.*;
+import org.apache.airavata.model.experiment.ExperimentModel;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.ReplicaCatalog;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.apache.airavata.worker.core.context.TaskContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+
+/**
+ * Created by Ajinkya on 4/13/17.
+ */
+public class DataStagingUtils {
+
+    private static final Logger log = LoggerFactory.getLogger(DataStagingUtils.class);
+
+    public static URI getDestinationURI(TaskContext taskContext, String hostName, String inputPath, String fileName) throws URISyntaxException {
+        String experimentDataDir = taskContext.getParentProcessContext().getProcessModel().getExperimentDataDir();
+        String filePath;
+        if(experimentDataDir != null && !experimentDataDir.isEmpty()) {
+            if(!experimentDataDir.endsWith(File.separator)){
+                experimentDataDir += File.separator;
+            }
+            if (experimentDataDir.startsWith(File.separator)) {
+                filePath = experimentDataDir + fileName;
+            } else {
+                filePath = inputPath + experimentDataDir + fileName;
+            }
+        } else {
+            filePath = inputPath + taskContext.getParentProcessContext().getProcessId() + File.separator + fileName;
+        }
+        //FIXME
+        return new URI("file", taskContext.getParentProcessContext().getStorageResourceLoginUserName(), hostName, 22, filePath, null, null);
+
+    }
+    public static void saveExperimentOutput(ProcessContext processContext, String outputName, String outputVal) throws WorkerException {
+        try {
+            ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+            String experimentId = processContext.getExperimentId();
+            ExperimentModel experiment = (ExperimentModel)experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
+            List<OutputDataObjectType> experimentOutputs = experiment.getExperimentOutputs();
+            if (experimentOutputs != null && !experimentOutputs.isEmpty()){
+                for (OutputDataObjectType expOutput : experimentOutputs){
+                    if (expOutput.getName().equals(outputName)){
+                        DataProductModel dataProductModel = new DataProductModel();
+                        dataProductModel.setGatewayId(processContext.getGatewayId());
+                        dataProductModel.setOwnerName(processContext.getProcessModel().getUserName());
+                        dataProductModel.setProductName(outputName);
+                        dataProductModel.setDataProductType(DataProductType.FILE);
+
+                        DataReplicaLocationModel replicaLocationModel = new DataReplicaLocationModel();
+                        replicaLocationModel.setStorageResourceId(processContext.getStorageResource().getStorageResourceId());
+                        replicaLocationModel.setReplicaName(outputName + " gateway data store copy");
+                        replicaLocationModel.setReplicaLocationCategory(ReplicaLocationCategory.GATEWAY_DATA_STORE);
+                        replicaLocationModel.setReplicaPersistentType(ReplicaPersistentType.TRANSIENT);
+                        replicaLocationModel.setFilePath(outputVal);
+                        dataProductModel.addToReplicaLocations(replicaLocationModel);
+
+                        ReplicaCatalog replicaCatalog = RegistryFactory.getReplicaCatalog();
+                        String productUri = replicaCatalog.registerDataProduct(dataProductModel);
+                        expOutput.setValue(productUri);
+                    }
+                }
+            }
+            experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT, experiment, experimentId);
+        } catch (RegistryException e) {
+            String msg = "expId: " + processContext.getExperimentId() + " processId: " + processContext.getProcessId()
+                    + " : - Error while updating experiment outputs";
+            throw new WorkerException(msg, e);
+        }
+    }
+
+    public static void saveProcessOutput(ProcessContext processContext, String outputName, String outputVal) throws WorkerException {
+        try {
+            ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+            String processId = processContext.getProcessId();
+            List<OutputDataObjectType>  processOutputs = (List<OutputDataObjectType> )experimentCatalog.get(ExperimentCatalogModelType.PROCESS_OUTPUT, processId);
+            if (processOutputs != null && !processOutputs.isEmpty()){
+                for (OutputDataObjectType processOutput : processOutputs){
+                    if (processOutput.getName().equals(outputName)){
+                        processOutput.setValue(outputVal);
+                    }
+                }
+            }
+            ProcessModel processModel = processContext.getProcessModel();
+            processModel.setProcessOutputs(processOutputs);
+            experimentCatalog.update(ExperimentCatalogModelType.PROCESS, processModel, processId);
+        } catch (RegistryException e) {
+            String msg = "expId: " + processContext.getExperimentId() + " processId: " + processContext.getProcessId()
+                    + " : - Error while updating experiment outputs";
+            throw new WorkerException(msg, e);
+        }
+    }
+}