You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ad...@apache.org on 2017/04/13 18:47:54 UTC
[6/6] airavata git commit: Adding Data staging task implementation
and related utils
Adding Data staging task implementation and related utils
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/ea865f23
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/ea865f23
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/ea865f23
Branch: refs/heads/feature-workload-mgmt
Commit: ea865f23709778addf03480b10c5a9e80b2294f3
Parents: fbc0351
Author: Ajinkya Dhamnaskar <ad...@apache.org>
Authored: Thu Apr 13 14:38:31 2017 -0400
Committer: Ajinkya Dhamnaskar <ad...@apache.org>
Committed: Thu Apr 13 14:38:31 2017 -0400
----------------------------------------------------------------------
modules/worker/task-datastaging/pom.xml | 72 +++++
.../task/datastaging/handler/DataStageTask.java | 125 ++++++++
.../datastaging/handler/SCPDataStageTask.java | 292 +++++++++++++++++++
.../datastaging/utils/DataStagingFactory.java | 30 ++
.../datastaging/utils/DataStagingUtils.java | 107 +++++++
5 files changed, 626 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/ea865f23/modules/worker/task-datastaging/pom.xml
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/pom.xml b/modules/worker/task-datastaging/pom.xml
new file mode 100644
index 0000000..c046f45
--- /dev/null
+++ b/modules/worker/task-datastaging/pom.xml
@@ -0,0 +1,72 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-worker</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>airavata-data-staging</artifactId>
+ <name>Airavata Task - Data Staging</name>
+ <description>I/O Data staging task implementation.</description>
+ <url>http://airavata.apache.org/</url>
+
+ <properties>
+ <jcraft.version>0.1.53</jcraft.version>
+ <net.schmizz.version>0.6.1</net.schmizz.version>
+ </properties>
+
+ <dependencies>
+ <!-- Logging -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.jcraft</groupId>
+ <artifactId>jsch</artifactId>
+ <version>${jcraft.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>net.schmizz</groupId>
+ <artifactId>sshj</artifactId>
+ <version>${net.schmizz.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-data-models</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-worker-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.ethz.ganymed</groupId>
+ <artifactId>ganymed-ssh2</artifactId>
+ <version>262</version>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.groovy</groupId>
+ <artifactId>groovy</artifactId>
+ <version>${groovy.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.groovy</groupId>
+ <artifactId>groovy-templates</artifactId>
+ <version>${groovy.version}</version>
+ </dependency>
+ <!-- Credential Store -->
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-credential-store</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/ea865f23/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/DataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/DataStageTask.java b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/DataStageTask.java
new file mode 100644
index 0000000..aab6a735
--- /dev/null
+++ b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/DataStageTask.java
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.worker.task.datastaging.handler;
+
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.worker.core.context.TaskContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.apache.airavata.worker.core.task.Task;
+import org.apache.airavata.worker.core.task.TaskException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Map;
+
+public class DataStageTask implements Task {
+ private static final Logger log = LoggerFactory.getLogger(DataStageTask.class);
+
+ @Override
+ public void init(Map<String, String> propertyMap) throws TaskException {
+
+ }
+
+ @Override
+ public TaskStatus execute(TaskContext taskContext) {
+ TaskStatus status = new TaskStatus(TaskState.COMPLETED);
+ if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) {
+ status.setState(TaskState.FAILED);
+ status.setReason("Invalid task call, expected " + TaskTypes.DATA_STAGING.toString() + " but found "
+ + taskContext.getTaskModel().getTaskType().toString());
+ } else {
+ try {
+ DataStagingTaskModel subTaskModel = ((DataStagingTaskModel) taskContext.getSubTaskModel());
+ URI sourceURI = new URI(subTaskModel.getSource());
+ URI destinationURI = new URI(subTaskModel.getDestination());
+
+ ProcessState processState = taskContext.getParentProcessContext().getProcessState();
+ if (processState == ProcessState.INPUT_DATA_STAGING) {
+ /**
+ * copy local file to compute resource.
+ */
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyTo(sourceURI.getPath(), destinationURI
+ .getPath());
+ } else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+ /**
+ * copy remote file from compute resource.
+ */
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyFrom(sourceURI.getPath(), destinationURI
+ .getPath());
+ }
+ status.setReason("Successfully staged data");
+ } catch (WorkerException e) {
+ String msg = "Scp attempt failed";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ } catch (TException e) {
+ String msg = "Invalid task invocation";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ } catch (URISyntaxException e) {
+ String msg = "source or destination is not a valid URI";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ }
+ }
+ return status;
+ }
+
+ @Override
+ public TaskStatus recover(TaskContext taskContext) {
+ TaskState state = taskContext.getTaskStatus().getState();
+ if (state == TaskState.EXECUTING || state == TaskState.CREATED) {
+ return execute(taskContext);
+ } else {
+ // files already transferred or failed
+ return taskContext.getTaskStatus();
+ }
+ }
+
+ @Override
+ public TaskTypes getType() {
+ return TaskTypes.DATA_STAGING;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/ea865f23/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/SCPDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/SCPDataStageTask.java b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/SCPDataStageTask.java
new file mode 100644
index 0000000..bd13b1e
--- /dev/null
+++ b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/SCPDataStageTask.java
@@ -0,0 +1,292 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.worker.task.datastaging.handler;
+
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.credential.store.store.CredentialStoreException;
+import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.worker.core.authentication.AuthenticationInfo;
+import org.apache.airavata.worker.core.cluster.CommandInfo;
+import org.apache.airavata.worker.core.cluster.RawCommandInfo;
+import org.apache.airavata.worker.core.cluster.RemoteCluster;
+import org.apache.airavata.worker.core.cluster.ServerInfo;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.apache.airavata.worker.core.context.TaskContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.apache.airavata.worker.core.task.Task;
+import org.apache.airavata.worker.core.task.TaskException;
+import org.apache.airavata.worker.core.utils.WorkerFactory;
+import org.apache.airavata.worker.task.datastaging.utils.DataStagingFactory;
+import org.apache.airavata.worker.task.datastaging.utils.DataStagingUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * This will be used for both Input file staging and output file staging, hence if you do any changes to a part of logic
+ * in this class please consider that will works with both input and output cases.
+ */
+public class SCPDataStageTask implements Task {
+ private static final Logger log = LoggerFactory.getLogger(SCPDataStageTask.class);
+ private static final int DEFAULT_SSH_PORT = 22;
+ private String hostName;
+ private String inputPath;
+
+ @Override
+ public void init(Map<String, String> propertyMap) throws TaskException {
+
+ }
+
+ @Override
+ public TaskStatus execute(TaskContext taskContext) {
+ TaskStatus status = new TaskStatus(TaskState.EXECUTING);
+ AuthenticationInfo authenticationInfo = null;
+ DataStagingTaskModel subTaskModel = null;
+ String localDataDir = null;
+
+ ProcessContext processContext = taskContext.getParentProcessContext();
+ ProcessState processState = processContext.getProcessState();
+ try {
+ subTaskModel = ((DataStagingTaskModel) taskContext.getSubTaskModel());
+ if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+ OutputDataObjectType processOutput = taskContext.getProcessOutput();
+ if (processOutput != null && processOutput.getValue() == null) {
+ log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
+ taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+ processOutput.getName());
+ status = new TaskStatus(TaskState.FAILED);
+ if (processOutput.isIsRequired()) {
+ status.setReason("File name is null, but this output's isRequired bit is not set");
+ } else {
+ status.setReason("File name is null");
+ }
+ return status;
+ }
+ } else if (processState == ProcessState.INPUT_DATA_STAGING) {
+ InputDataObjectType processInput = taskContext.getProcessInput();
+ if (processInput != null && processInput.getValue() == null) {
+ log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
+ taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+ processInput.getName());
+ status = new TaskStatus(TaskState.FAILED);
+ if (processInput.isIsRequired()) {
+ status.setReason("File name is null, but this input's isRequired bit is not set");
+ } else {
+ status.setReason("File name is null");
+ }
+ return status;
+ }
+ } else {
+ status.setState(TaskState.FAILED);
+ status.setReason("Invalid task invocation, Support " + ProcessState.INPUT_DATA_STAGING.name() + " and " +
+ "" + ProcessState.OUTPUT_DATA_STAGING.name() + " process phases. found " + processState.name());
+ return status;
+ }
+
+ StorageResourceDescription storageResource = processContext.getStorageResource();
+// StoragePreference storagePreference = taskContext.getParentProcessContext().getStoragePreference();
+
+ if (storageResource != null) {
+ hostName = storageResource.getHostName();
+ } else {
+ throw new WorkerException("Storage Resource is null");
+ }
+ inputPath = processContext.getStorageFileSystemRootLocation();
+ inputPath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator);
+
+ // use rsync instead of scp if source and destination host and user name is same.
+ URI sourceURI = new URI(subTaskModel.getSource());
+ String fileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1,
+ sourceURI.getPath().length());
+ URI destinationURI = null;
+ if (subTaskModel.getDestination().startsWith("dummy")) {
+ destinationURI = DataStagingUtils.getDestinationURI(taskContext, hostName, inputPath, fileName);
+ subTaskModel.setDestination(destinationURI.toString());
+ } else {
+ destinationURI = new URI(subTaskModel.getDestination());
+ }
+
+ if (sourceURI.getHost().equalsIgnoreCase(destinationURI.getHost())
+ && sourceURI.getUserInfo().equalsIgnoreCase(destinationURI.getUserInfo())) {
+ localDataCopy(taskContext, sourceURI, destinationURI);
+ status.setState(TaskState.COMPLETED);
+ status.setReason("Locally copied file using 'cp' command ");
+ return status;
+ }
+
+ authenticationInfo = DataStagingFactory.getComputerResourceSSHKeyAuthentication(processContext);
+ status = new TaskStatus(TaskState.COMPLETED);
+
+ ServerInfo serverInfo = processContext.getComputeResourceServerInfo();
+ Session sshSession = WorkerFactory.getSSHSession(authenticationInfo, serverInfo);
+ if (processState == ProcessState.INPUT_DATA_STAGING) {
+ inputDataStaging(taskContext, sshSession, sourceURI, destinationURI);
+ status.setReason("Successfully staged input data");
+ } else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+ makeDir(taskContext, destinationURI);
+ // TODO - save updated subtask model with new destination
+ outputDataStaging(taskContext, sshSession, sourceURI, destinationURI);
+ status.setReason("Successfully staged output data");
+ }
+ } catch (TException e) {
+ String msg = "Couldn't create subTask model thrift model";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ return status;
+ } catch (ApplicationSettingsException | FileNotFoundException e) {
+ String msg = "Failed while reading credentials";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ } catch (URISyntaxException e) {
+ String msg = "Source or destination uri is not correct source : " + subTaskModel.getSource() + ", " +
+ "destination : " + subTaskModel.getDestination();
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ } catch (CredentialStoreException e) {
+ String msg = "Storage authentication issue, could be invalid credential token";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ } catch (AiravataException e) {
+ String msg = "Error while creating ssh session with client";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ } catch (JSchException | IOException e) {
+ String msg = "Failed to do scp with client";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ } catch (WorkerException e) {
+ String msg = "Data staging failed";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ }
+ return status;
+ }
+
+ private void localDataCopy(TaskContext taskContext, URI sourceURI, URI destinationURI) throws WorkerException {
+ StringBuilder sb = new StringBuilder("rsync -cr ");
+ sb.append(sourceURI.getPath()).append(" ").append(destinationURI.getPath());
+ CommandInfo commandInfo = new RawCommandInfo(sb.toString());
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster().execute(commandInfo);
+ }
+
+ private void inputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI
+ destinationURI) throws WorkerException, IOException, JSchException {
+ /**
+ * scp third party file transfer 'to' compute resource.
+ */
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster().scpThirdParty(sourceURI.getPath(),
+ destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.FROM, false);
+ }
+
+ private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI destinationURI)
+ throws AiravataException, IOException, JSchException, WorkerException {
+
+ /**
+ * scp third party file transfer 'from' comute resource.
+ */
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster().scpThirdParty(sourceURI.getPath(),
+ destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.TO, true);
+ // update output locations
+ DataStagingUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.toString());
+ DataStagingUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.toString());
+
+ }
+
+ private void makeDir(TaskContext taskContext, URI pathURI) throws WorkerException {
+ int endIndex = pathURI.getPath().lastIndexOf('/');
+ if (endIndex < 1) {
+ return;
+ }
+ String targetPath = pathURI.getPath().substring(0, endIndex);
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster().makeDirectory(targetPath);
+ }
+
+ @Override
+ public TaskStatus recover(TaskContext taskContext) {
+ TaskState state = taskContext.getTaskStatus().getState();
+ if (state == TaskState.EXECUTING || state == TaskState.CREATED) {
+ return execute(taskContext);
+ } else {
+ // files already transferred or failed
+ return taskContext.getTaskStatus();
+ }
+ }
+
+ @Override
+ public TaskTypes getType() {
+ return TaskTypes.DATA_STAGING;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/ea865f23/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingFactory.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingFactory.java b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingFactory.java
new file mode 100644
index 0000000..51996b8
--- /dev/null
+++ b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingFactory.java
@@ -0,0 +1,30 @@
+package org.apache.airavata.worker.task.datastaging.utils;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.credential.store.store.CredentialStoreException;
+import org.apache.airavata.worker.core.authentication.SSHKeyAuthentication;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.apache.airavata.worker.core.utils.WorkerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by Ajinkya on 4/13/17.
+ */
+public class DataStagingFactory {
+
+ private static final Logger log = LoggerFactory.getLogger(DataStagingFactory.class);
+
+ public static SSHKeyAuthentication getComputerResourceSSHKeyAuthentication(ProcessContext pc)
+ throws WorkerException, CredentialStoreException {
+ try {
+ return WorkerFactory.getSshKeyAuthentication(pc.getGatewayId(),
+ pc.getComputeResourceLoginUserName(),
+ pc.getComputeResourceCredentialToken());
+ } catch (ApplicationSettingsException | IllegalAccessException | InstantiationException e) {
+ throw new WorkerException("Couldn't build ssh authentication object", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/ea865f23/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingUtils.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingUtils.java b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingUtils.java
new file mode 100644
index 0000000..d3d7522
--- /dev/null
+++ b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingUtils.java
@@ -0,0 +1,107 @@
+package org.apache.airavata.worker.task.datastaging.utils;
+
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.data.replica.*;
+import org.apache.airavata.model.experiment.ExperimentModel;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.ReplicaCatalog;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.apache.airavata.worker.core.context.TaskContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+
+/**
+ * Created by Ajinkya on 4/13/17.
+ */
+public class DataStagingUtils {
+
+ private static final Logger log = LoggerFactory.getLogger(DataStagingUtils.class);
+
+ public static URI getDestinationURI(TaskContext taskContext, String hostName, String inputPath, String fileName) throws URISyntaxException {
+ String experimentDataDir = taskContext.getParentProcessContext().getProcessModel().getExperimentDataDir();
+ String filePath;
+ if(experimentDataDir != null && !experimentDataDir.isEmpty()) {
+ if(!experimentDataDir.endsWith(File.separator)){
+ experimentDataDir += File.separator;
+ }
+ if (experimentDataDir.startsWith(File.separator)) {
+ filePath = experimentDataDir + fileName;
+ } else {
+ filePath = inputPath + experimentDataDir + fileName;
+ }
+ } else {
+ filePath = inputPath + taskContext.getParentProcessContext().getProcessId() + File.separator + fileName;
+ }
+ //FIXME
+ return new URI("file", taskContext.getParentProcessContext().getStorageResourceLoginUserName(), hostName, 22, filePath, null, null);
+
+ }
+ public static void saveExperimentOutput(ProcessContext processContext, String outputName, String outputVal) throws WorkerException {
+ try {
+ ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+ String experimentId = processContext.getExperimentId();
+ ExperimentModel experiment = (ExperimentModel)experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
+ List<OutputDataObjectType> experimentOutputs = experiment.getExperimentOutputs();
+ if (experimentOutputs != null && !experimentOutputs.isEmpty()){
+ for (OutputDataObjectType expOutput : experimentOutputs){
+ if (expOutput.getName().equals(outputName)){
+ DataProductModel dataProductModel = new DataProductModel();
+ dataProductModel.setGatewayId(processContext.getGatewayId());
+ dataProductModel.setOwnerName(processContext.getProcessModel().getUserName());
+ dataProductModel.setProductName(outputName);
+ dataProductModel.setDataProductType(DataProductType.FILE);
+
+ DataReplicaLocationModel replicaLocationModel = new DataReplicaLocationModel();
+ replicaLocationModel.setStorageResourceId(processContext.getStorageResource().getStorageResourceId());
+ replicaLocationModel.setReplicaName(outputName + " gateway data store copy");
+ replicaLocationModel.setReplicaLocationCategory(ReplicaLocationCategory.GATEWAY_DATA_STORE);
+ replicaLocationModel.setReplicaPersistentType(ReplicaPersistentType.TRANSIENT);
+ replicaLocationModel.setFilePath(outputVal);
+ dataProductModel.addToReplicaLocations(replicaLocationModel);
+
+ ReplicaCatalog replicaCatalog = RegistryFactory.getReplicaCatalog();
+ String productUri = replicaCatalog.registerDataProduct(dataProductModel);
+ expOutput.setValue(productUri);
+ }
+ }
+ }
+ experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT, experiment, experimentId);
+ } catch (RegistryException e) {
+ String msg = "expId: " + processContext.getExperimentId() + " processId: " + processContext.getProcessId()
+ + " : - Error while updating experiment outputs";
+ throw new WorkerException(msg, e);
+ }
+ }
+
+ public static void saveProcessOutput(ProcessContext processContext, String outputName, String outputVal) throws WorkerException {
+ try {
+ ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+ String processId = processContext.getProcessId();
+ List<OutputDataObjectType> processOutputs = (List<OutputDataObjectType> )experimentCatalog.get(ExperimentCatalogModelType.PROCESS_OUTPUT, processId);
+ if (processOutputs != null && !processOutputs.isEmpty()){
+ for (OutputDataObjectType processOutput : processOutputs){
+ if (processOutput.getName().equals(outputName)){
+ processOutput.setValue(outputVal);
+ }
+ }
+ }
+ ProcessModel processModel = processContext.getProcessModel();
+ processModel.setProcessOutputs(processOutputs);
+ experimentCatalog.update(ExperimentCatalogModelType.PROCESS, processModel, processId);
+ } catch (RegistryException e) {
+ String msg = "expId: " + processContext.getExperimentId() + " processId: " + processContext.getProcessId()
+ + " : - Error while updating experiment outputs";
+ throw new WorkerException(msg, e);
+ }
+ }
+}