You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/11/09 21:25:58 UTC
[2/2] airavata git commit: Added Local batch job submission support
and renamed task implementtions to have generic names
Added Local batch job submission support and renamed task implementtions to have generic names
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/dc1be312
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/dc1be312
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/dc1be312
Branch: refs/heads/master
Commit: dc1be3126409992f6de94d76e0fc4834540e9e9c
Parents: c62f74a
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Mon Nov 9 15:25:52 2015 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Mon Nov 9 15:25:52 2015 -0500
----------------------------------------------------------------------
.../server/src/main/resources/gfac-config.yaml | 11 +-
.../org/apache/airavata/gfac/impl/Factory.java | 16 +-
.../airavata/gfac/impl/GFacEngineImpl.java | 4 +-
.../airavata/gfac/impl/LocalCommandOutput.java | 2 +-
.../impl/task/AdvancedSCPDataStageTask.java | 344 -----------------
.../airavata/gfac/impl/task/DataStageTask.java | 126 +++++++
.../impl/task/DefaultJobSubmissionTask.java | 286 +++++++++++++++
.../gfac/impl/task/EnvironmentSetupTask.java | 74 ++++
.../gfac/impl/task/ForkJobSubmissionTask.java | 184 ++++++++++
.../gfac/impl/task/SCPDataStageTask.java | 365 +++++++++++++++----
.../gfac/impl/task/SSHEnvironmentSetupTask.java | 74 ----
.../impl/task/SSHForkJobSubmissionTask.java | 184 ----------
.../gfac/impl/task/SSHJobSubmissionTask.java | 286 ---------------
13 files changed, 980 insertions(+), 976 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/configuration/server/src/main/resources/gfac-config.yaml
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/gfac-config.yaml b/modules/configuration/server/src/main/resources/gfac-config.yaml
index 8dafe09..dff4062 100644
--- a/modules/configuration/server/src/main/resources/gfac-config.yaml
+++ b/modules/configuration/server/src/main/resources/gfac-config.yaml
@@ -20,7 +20,7 @@
jobSubmitters:
- submissionProtocol: SSH
- taskClass: org.apache.airavata.gfac.impl.task.SSHJobSubmissionTask
+ taskClass: org.apache.airavata.gfac.impl.task.DefaultJobSubmissionTask
# properties:
# - userName: airavata
# passPhrase: airavata
@@ -29,10 +29,11 @@ jobSubmitters:
# hostName: remote.client.hostName
- submissionProtocol: SSH_FORK
- taskClass: org.apache.airavata.gfac.impl.task.SSHForkJobSubmissionTask
+ taskClass: org.apache.airavata.gfac.impl.task.ForkJobSubmissionTask
- submissionProtocol: LOCAL
- taskClass: org.apache.airavata.gfac.impl.task.LocalJobSubmissionTask
+ taskClass: org.apache.airavata.gfac.impl.task.DefaultJobSubmissionTask
+
# Following job subbmitters are not yet implemented.
@@ -47,14 +48,14 @@ commonTasks:
fileTransferTasks:
- transferProtocol: SCP
- taskClass: org.apache.airavata.gfac.impl.task.SCPDataStageTask
+ taskClass: org.apache.airavata.gfac.impl.task.DataStageTask
# If your client doen't run the same instance where airavata server is running then you need to comment above
# SCPDataStageTask and uncomment AdvancedSCPDataStageTask. To work with AdvancedSCPDataStageTask, you either need to
# provide ssh keys or password.
# - transferProtocol: SCP
-# taskClass: org.apache.airavata.gfac.impl.task.AdvancedSCPDataStageTask
+# taskClass: org.apache.airavata.gfac.impl.task.SCPDataStageTask
# properties:
# - userName: airavata
# passPhrase: airavata
http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index e2966c5..4dc63e6 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -207,11 +207,17 @@ public abstract class Factory {
String computeResourceId = processContext.getComputeResourceId();
String key = processContext.getJobSubmissionProtocol().toString() + ":" + computeResourceId;
RemoteCluster remoteCluster = remoteClusterMap.get(key);
- if (remoteCluster == null) {
- JobManagerConfiguration jobManagerConfiguration = getJobManagerConfiguration(processContext.getResourceJobManager());
- AuthenticationInfo authenticationInfo = getSSHKeyAuthentication();
- remoteCluster = new HPCRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration, authenticationInfo);
- remoteClusterMap.put(key, remoteCluster);
+ JobSubmissionProtocol jobSubmissionProtocol = processContext.getJobSubmissionProtocol();
+ if (remoteCluster == null) {
+ JobManagerConfiguration jobManagerConfiguration = getJobManagerConfiguration(processContext.getResourceJobManager());
+ if (jobSubmissionProtocol == JobSubmissionProtocol.LOCAL) {
+ remoteCluster = new LocalRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration, null);
+ } else if (jobSubmissionProtocol == JobSubmissionProtocol.SSH ||
+ jobSubmissionProtocol == JobSubmissionProtocol.SSH_FORK) {
+ AuthenticationInfo authenticationInfo = getSSHKeyAuthentication();
+ remoteCluster = new HPCRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration, authenticationInfo);
+ }
+ remoteClusterMap.put(key, remoteCluster);
}
return remoteCluster;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 6444eb4..d386f66 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -35,7 +35,7 @@ import org.apache.airavata.gfac.core.monitor.JobMonitor;
import org.apache.airavata.gfac.core.task.JobSubmissionTask;
import org.apache.airavata.gfac.core.task.Task;
import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.task.SSHEnvironmentSetupTask;
+import org.apache.airavata.gfac.impl.task.EnvironmentSetupTask;
import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
import org.apache.airavata.model.appcatalog.computeresource.*;
import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
@@ -356,7 +356,7 @@ public class GFacEngineImpl implements GFacEngine {
EnvironmentSetupTaskModel subTaskModel = (EnvironmentSetupTaskModel) taskContext.getSubTaskModel();
Task envSetupTask = null;
if (subTaskModel.getProtocol() == SecurityProtocol.SSH_KEYS) {
- envSetupTask = new SSHEnvironmentSetupTask();
+ envSetupTask = new EnvironmentSetupTask();
} else {
throw new GFacException("Unsupported security protocol, Airavata doesn't support " +
subTaskModel.getProtocol().name() + " protocol yet.");
http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java
index e9d683d..4d98423 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java
@@ -55,6 +55,6 @@ public class LocalCommandOutput implements CommandOutput {
@Override
public int getExitCode() {
- return 0;
+ return process.exitValue();
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
deleted file mode 100644
index 6c4e14e..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.airavata.gfac.impl.task;
-
-import com.jcraft.jsch.JSchException;
-import com.jcraft.jsch.Session;
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.credential.store.credential.Credential;
-import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
-import org.apache.airavata.credential.store.store.CredentialReader;
-import org.apache.airavata.credential.store.store.CredentialStoreException;
-import org.apache.airavata.gfac.core.GFacException;
-import org.apache.airavata.gfac.core.GFacUtils;
-import org.apache.airavata.gfac.core.SSHApiException;
-import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
-import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication;
-import org.apache.airavata.gfac.core.authentication.SSHPasswordAuthentication;
-import org.apache.airavata.gfac.core.cluster.CommandInfo;
-import org.apache.airavata.gfac.core.cluster.RawCommandInfo;
-import org.apache.airavata.gfac.core.cluster.RemoteCluster;
-import org.apache.airavata.gfac.core.cluster.ServerInfo;
-import org.apache.airavata.gfac.core.context.TaskContext;
-import org.apache.airavata.gfac.core.task.Task;
-import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.Factory;
-import org.apache.airavata.gfac.impl.SSHUtils;
-import org.apache.airavata.model.application.io.InputDataObjectType;
-import org.apache.airavata.model.application.io.OutputDataObjectType;
-import org.apache.airavata.model.commons.ErrorModel;
-import org.apache.airavata.model.status.ProcessState;
-import org.apache.airavata.model.status.TaskState;
-import org.apache.airavata.model.status.TaskStatus;
-import org.apache.airavata.model.task.DataStagingTaskModel;
-import org.apache.airavata.model.task.TaskTypes;
-import org.apache.airavata.registry.cpi.ExperimentCatalog;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This will be used for both Input file staging and output file staging, hence if you do any changes to a part of logic
- * in this class please consider that will works with both input and output cases.
- */
-public class AdvancedSCPDataStageTask implements Task {
- private static final Logger log = LoggerFactory.getLogger(AdvancedSCPDataStageTask.class);
- private static final int DEFAULT_SSH_PORT = 22;
- private String password;
- private String publicKeyPath;
- private String passPhrase;
- private String privateKeyPath;
- private String userName;
- private String hostName;
- private String inputPath;
-
- @Override
- public void init(Map<String, String> propertyMap) throws TaskException {
- inputPath = propertyMap.get("inputPath");
- hostName = propertyMap.get("hostName");
- userName = propertyMap.get("userName");
- }
-
- @Override
- public TaskStatus execute(TaskContext taskContext) {
- TaskStatus status = new TaskStatus(TaskState.EXECUTING);
- AuthenticationInfo authenticationInfo = null;
- DataStagingTaskModel subTaskModel = null;
- String localDataDir = null;
- ProcessState processState = taskContext.getParentProcessContext().getProcessState();
- try {
- subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel
- (taskContext.getTaskModel());
- if (processState == ProcessState.OUTPUT_DATA_STAGING) {
- OutputDataObjectType processOutput = taskContext.getProcessOutput();
- if (processOutput != null && processOutput.getValue() == null) {
- log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
- taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
- processOutput.getName());
- status = new TaskStatus(TaskState.FAILED);
- if (processOutput.isIsRequired()) {
- status.setReason("File name is null, but this output's isRequired bit is not set");
- } else {
- status.setReason("File name is null");
- }
- return status;
- }
- } else if (processState == ProcessState.INPUT_DATA_STAGING) {
- InputDataObjectType processInput = taskContext.getProcessInput();
- if (processInput != null && processInput.getValue() == null) {
- log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
- taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
- processInput.getName());
- status = new TaskStatus(TaskState.FAILED);
- if (processInput.isIsRequired()) {
- status.setReason("File name is null, but this input's isRequired bit is not set");
- } else {
- status.setReason("File name is null");
- }
- return status;
- }
- } else {
- status.setState(TaskState.FAILED);
- status.setReason("Invalid task invocation, Support " + ProcessState.INPUT_DATA_STAGING.name() + " and " +
- "" + ProcessState.OUTPUT_DATA_STAGING.name() + " process phases. found " + processState.name());
- return status;
- }
-
- // use rsync instead of scp if source and destination host and user name is same.
- URI sourceURI = new URI(subTaskModel.getSource());
- String fileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1,
- sourceURI.getPath().length());
- URI destinationURI = null;
- if (subTaskModel.getDestination().startsWith("dummy")) {
- destinationURI = getDestinationURI(taskContext, fileName);
- subTaskModel.setDestination(destinationURI.toString());
- } else {
- destinationURI = new URI(subTaskModel.getDestination());
- }
-
- if (sourceURI.getHost().equalsIgnoreCase(destinationURI.getHost())
- && sourceURI.getUserInfo().equalsIgnoreCase(destinationURI.getUserInfo())) {
- localDataCopy(taskContext, sourceURI, destinationURI);
- status.setState(TaskState.COMPLETED);
- status.setReason("Locally copied file using 'cp' command ");
- return status;
- }
-
-
- String tokenId = taskContext.getParentProcessContext().getTokenId();
- CredentialReader credentialReader = GFacUtils.getCredentialReader();
- Credential credential = credentialReader.getCredential(taskContext.getParentProcessContext().getGatewayId(), tokenId);
- if (credential instanceof SSHCredential) {
- SSHCredential sshCredential = (SSHCredential) credential;
- byte[] publicKey = sshCredential.getPublicKey();
- publicKeyPath = writeFileToDisk(publicKey);
- byte[] privateKey = sshCredential.getPrivateKey();
- privateKeyPath = writeFileToDisk(privateKey);
- passPhrase = sshCredential.getPassphrase();
-// userName = sshCredential.getPortalUserName(); // this might not same as login user name
- authenticationInfo = getSSHKeyAuthentication();
- } else {
- String msg = "Provided credential store token is not valid. Please provide the correct credential store token";
- log.error(msg);
- status.setState(TaskState.FAILED);
- status.setReason(msg);
- ErrorModel errorModel = new ErrorModel();
- errorModel.setActualErrorMessage(msg);
- errorModel.setUserFriendlyMessage(msg);
- taskContext.getTaskModel().setTaskError(errorModel);
- return status;
- }
- status = new TaskStatus(TaskState.COMPLETED);
-
- ServerInfo serverInfo = new ServerInfo(userName, hostName, DEFAULT_SSH_PORT);
- Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
- if (processState == ProcessState.INPUT_DATA_STAGING) {
- inputDataStaging(taskContext, sshSession, sourceURI, destinationURI);
- status.setReason("Successfully staged input data");
- } else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
- String targetPath = destinationURI.getPath().substring(0, destinationURI.getPath().lastIndexOf('/'));
- SSHUtils.makeDirectory(targetPath, sshSession);
- // TODO - save updated subtask model with new destination
- outputDataStaging(taskContext, sshSession, sourceURI, destinationURI);
- status.setReason("Successfully staged output data");
- }
- } catch (TException e) {
- String msg = "Couldn't create subTask model thrift model";
- log.error(msg, e);
- status.setState(TaskState.FAILED);
- status.setReason(msg);
- ErrorModel errorModel = new ErrorModel();
- errorModel.setActualErrorMessage(e.getMessage());
- errorModel.setUserFriendlyMessage(msg);
- taskContext.getTaskModel().setTaskError(errorModel);
- return status;
- } catch (ApplicationSettingsException | FileNotFoundException | CredentialStoreException | IllegalAccessException | InstantiationException e) {
- String msg = "Failed while reading credentials";
- log.error(msg, e);
- status.setState(TaskState.FAILED);
- status.setReason(msg);
- ErrorModel errorModel = new ErrorModel();
- errorModel.setActualErrorMessage(e.getMessage());
- errorModel.setUserFriendlyMessage(msg);
- taskContext.getTaskModel().setTaskError(errorModel);
- } catch (URISyntaxException e) {
- String msg = "Sorce or destination uri is not correct source : " + subTaskModel.getSource() + ", " +
- "destination : " + subTaskModel.getDestination();
- log.error(msg, e);
- status.setState(TaskState.FAILED);
- status.setReason(msg);
- ErrorModel errorModel = new ErrorModel();
- errorModel.setActualErrorMessage(e.getMessage());
- errorModel.setUserFriendlyMessage(msg);
- taskContext.getTaskModel().setTaskError(errorModel);
- } catch (SSHApiException e) {
- String msg = "Failed to do scp with compute resource";
- log.error(msg, e);
- status.setState(TaskState.FAILED);
- status.setReason(msg);
- ErrorModel errorModel = new ErrorModel();
- errorModel.setActualErrorMessage(e.getMessage());
- errorModel.setUserFriendlyMessage(msg);
- taskContext.getTaskModel().setTaskError(errorModel);
- } catch (AiravataException e) {
- String msg = "Error while creating ssh session with client";
- log.error(msg, e);
- status.setState(TaskState.FAILED);
- status.setReason(msg);
- ErrorModel errorModel = new ErrorModel();
- errorModel.setActualErrorMessage(e.getMessage());
- errorModel.setUserFriendlyMessage(msg);
- taskContext.getTaskModel().setTaskError(errorModel);
- } catch (JSchException | IOException e) {
- String msg = "Failed to do scp with client";
- log.error(msg, e);
- status.setState(TaskState.FAILED);
- status.setReason(msg);
- ErrorModel errorModel = new ErrorModel();
- errorModel.setActualErrorMessage(e.getMessage());
- errorModel.setUserFriendlyMessage(msg);
- taskContext.getTaskModel().setTaskError(errorModel);
- } catch (GFacException e) {
- String msg = "Failed update experiment and process inputs and outputs";
- log.error(msg, e);
- status.setState(TaskState.FAILED);
- status.setReason(msg);
- ErrorModel errorModel = new ErrorModel();
- errorModel.setActualErrorMessage(e.getMessage());
- errorModel.setUserFriendlyMessage(msg);
- taskContext.getTaskModel().setTaskError(errorModel);
- }
- return status;
- }
-
- private void localDataCopy(TaskContext taskContext, URI sourceURI, URI destinationURI) throws SSHApiException {
- StringBuilder sb = new StringBuilder("rsync -cr ");
- sb.append(sourceURI.getPath()).append(" ").append(destinationURI.getPath());
- CommandInfo commandInfo = new RawCommandInfo(sb.toString());
- taskContext.getParentProcessContext().getRemoteCluster().execute(commandInfo);
- }
-
- private void inputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI
- destinationURI) throws SSHApiException, IOException, JSchException {
- /**
- * scp third party file transfer 'to' compute resource.
- */
- taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(),
- destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.TO);
- }
-
- private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI destinationURI)
- throws SSHApiException, AiravataException, IOException, JSchException, GFacException {
-
- /**
- * scp third party file transfer 'from' comute resource.
- */
- taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(),
- destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.FROM);
- // update output locations
- GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath());
- GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath());
-
- }
-
- @Override
- public TaskStatus recover(TaskContext taskContext) {
- TaskState state = taskContext.getTaskStatus().getState();
- if (state == TaskState.EXECUTING || state == TaskState.CREATED) {
- return execute(taskContext);
- } else {
- // files already transferred or failed
- return taskContext.getTaskStatus();
- }
- }
-
- @Override
- public TaskTypes getType() {
- return TaskTypes.DATA_STAGING;
- }
-
- private SSHPasswordAuthentication getSSHPasswordAuthentication() {
- return new SSHPasswordAuthentication(userName, password);
- }
-
- private SSHKeyAuthentication getSSHKeyAuthentication() {
- SSHKeyAuthentication sshKA = new SSHKeyAuthentication();
- sshKA.setUserName(userName);
- sshKA.setPassphrase(passPhrase);
- sshKA.setPrivateKeyFilePath(privateKeyPath);
- sshKA.setPublicKeyFilePath(publicKeyPath);
- sshKA.setStrictHostKeyChecking("no");
- return sshKA;
- }
-
- private String writeFileToDisk(byte[] data) {
- File temp = null;
- try {
- temp = File.createTempFile("id_rsa", "");
- //write it
- FileOutputStream bw = new FileOutputStream(temp);
- bw.write(data);
- bw.close();
- } catch (IOException e) {
- log.error(e.getMessage(), e);
- }
- return temp.getAbsolutePath();
- }
-
- public URI getDestinationURI(TaskContext taskContext, String fileName) throws URISyntaxException {
- String filePath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator) +
- taskContext.getParentProcessContext().getProcessId() + File.separator + fileName;
- return new URI("SCP", hostName, filePath, null);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
new file mode 100644
index 0000000..ab9d562
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
@@ -0,0 +1,126 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.Task;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+public class DataStageTask implements Task {
+ private static final Logger log = LoggerFactory.getLogger(DataStageTask.class);
+
+ @Override
+ public void init(Map<String, String> propertyMap) throws TaskException {
+
+ }
+
+ @Override
+ public TaskStatus execute(TaskContext taskContext) {
+ TaskStatus status = new TaskStatus(TaskState.COMPLETED);
+ if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) {
+ status.setState(TaskState.FAILED);
+ status.setReason("Invalid task call, expected " + TaskTypes.DATA_STAGING.toString() + " but found "
+ + taskContext.getTaskModel().getTaskType().toString());
+ } else {
+ try {
+ DataStagingTaskModel subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel(taskContext
+ .getTaskModel());
+ URI sourceURI = new URI(subTaskModel.getSource());
+ URI destinationURI = new URI(subTaskModel.getDestination());
+
+ ProcessState processState = taskContext.getParentProcessContext().getProcessState();
+ if (processState == ProcessState.INPUT_DATA_STAGING) {
+ /**
+ * copy local file to compute resource.
+ */
+ taskContext.getParentProcessContext().getRemoteCluster().copyTo(sourceURI.getPath(), destinationURI
+ .getPath());
+ } else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+ /**
+ * copy remote file from compute resource.
+ */
+ taskContext.getParentProcessContext().getRemoteCluster().copyFrom(sourceURI.getPath(), destinationURI
+ .getPath());
+ }
+ status.setReason("Successfully staged data");
+ } catch (SSHApiException e) {
+ String msg = "Scp attempt failed";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ } catch (TException e) {
+ String msg = "Invalid task invocation";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ } catch (URISyntaxException e) {
+ String msg = "source or destination is not a valid URI";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ }
+ }
+ return status;
+ }
+
+ @Override
+ public TaskStatus recover(TaskContext taskContext) {
+ TaskState state = taskContext.getTaskStatus().getState();
+ if (state == TaskState.EXECUTING || state == TaskState.CREATED) {
+ return execute(taskContext);
+ } else {
+ // files already transferred or failed
+ return taskContext.getTaskStatus();
+ }
+ }
+
+ @Override
+ public TaskTypes getType() {
+ return TaskTypes.DATA_STAGING;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
new file mode 100644
index 0000000..020880d
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
@@ -0,0 +1,286 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.gfac.core.*;
+import org.apache.airavata.gfac.core.cluster.JobSubmissionOutput;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.JobSubmissionTask;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+public class DefaultJobSubmissionTask implements JobSubmissionTask {
+ private static final Logger log = LoggerFactory.getLogger(DefaultJobSubmissionTask.class);
+ @Override
+ public void init(Map<String, String> propertyMap) throws TaskException {
+
+ }
+
+ @Override
+ public TaskStatus execute(TaskContext taskContext){
+ TaskStatus taskStatus = new TaskStatus(TaskState.COMPLETED); // set to completed.
+ try {
+ ProcessContext processContext = taskContext.getParentProcessContext();
+ JobModel jobModel = processContext.getJobModel();
+ jobModel.setTaskId(taskContext.getTaskId());
+ RemoteCluster remoteCluster = processContext.getRemoteCluster();
+ JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext,taskContext);
+ jobModel.setJobName(jobDescriptor.getJobName());
+ ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
+ JobManagerConfiguration jConfig = null;
+ if (resourceJobManager != null) {
+ jConfig = Factory.getJobManagerConfiguration(resourceJobManager);
+ }
+ JobStatus jobStatus = new JobStatus();
+ File jobFile = GFacUtils.createJobFile(taskContext, jobDescriptor, jConfig);
+ if (jobFile != null && jobFile.exists()) {
+ jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
+ JobSubmissionOutput jobSubmissionOutput = remoteCluster.submitBatchJob(jobFile.getPath(),
+ processContext.getWorkingDir());
+ jobModel.setExitCode(jobSubmissionOutput.getExitCode());
+ jobModel.setStdErr(jobSubmissionOutput.getStdErr());
+ jobModel.setStdOut(jobSubmissionOutput.getStdOut());
+ String jobId = jobSubmissionOutput.getJobId();
+ if (jobId != null && !jobId.isEmpty()) {
+ jobModel.setJobId(jobId);
+ GFacUtils.saveJobModel(processContext, jobModel);
+ jobStatus.setJobState(JobState.SUBMITTED);
+ jobStatus.setReason("Successfully Submitted to " + taskContext.getParentProcessContext()
+ .getComputeResourceDescription().getHostName());
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ jobModel.setJobStatus(jobStatus);
+ GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+ if (verifyJobSubmissionByJobId(remoteCluster, jobId)) {
+ jobStatus.setJobState(JobState.QUEUED);
+ jobStatus.setReason("Verification step succeeded");
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ jobModel.setJobStatus(jobStatus);
+ GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+ }
+ taskStatus = new TaskStatus(TaskState.COMPLETED);
+ taskStatus.setReason("Submitted job to compute resource");
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ } else {
+ int verificationTryCount = 0;
+ while (verificationTryCount++ < 3) {
+ String verifyJobId = verifyJobSubmission(remoteCluster, jobModel);
+ if (verifyJobId != null && !verifyJobId.isEmpty()) {
+ // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
+ jobId = verifyJobId;
+ jobModel.setJobId(jobId);
+ GFacUtils.saveJobModel(processContext,jobModel);
+ jobStatus.setJobState(JobState.QUEUED);
+ jobStatus.setReason("Verification step succeeded");
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ jobModel.setJobStatus(jobStatus);
+ GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+ taskStatus.setState(TaskState.COMPLETED);
+ taskStatus.setReason("Submitted job to compute resource");
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ break;
+ }
+ log.info("Verify step return invalid jobId, retry verification step in {} secs", verificationTryCount * 10);
+ Thread.sleep(verificationTryCount * 10000);
+ }
+ }
+
+ if (jobId == null || jobId.isEmpty()) {
+ String msg = "expId:" + processContext.getProcessModel().getExperimentId() + " Couldn't find " +
+ "remote jobId for JobName:" + jobModel.getJobName() + ", both submit and verify steps " +
+ "doesn't return a valid JobId. " + "Hence changing experiment state to Failed";
+ log.error(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setUserFriendlyMessage(msg);
+ errorModel.setActualErrorMessage(msg);
+ GFacUtils.saveExperimentError(processContext, errorModel);
+ GFacUtils.saveProcessError(processContext, errorModel);
+ GFacUtils.saveTaskError(taskContext, errorModel);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason("Couldn't find job id in both submitted and verified steps");
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ }else {
+ GFacUtils.saveJobModel(processContext, jobModel);
+ }
+ } else {
+ taskStatus.setState(TaskState.FAILED);
+ if (jobFile == null) {
+ taskStatus.setReason("JobFile is null");
+ } else {
+ taskStatus.setReason("Job file doesn't exist");
+ }
+ }
+
+ } catch (AppCatalogException e) {
+ String msg = "Error while instantiating app catalog";
+ log.error(msg, e);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason(msg);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ } catch (ApplicationSettingsException e) {
+ String msg = "Error occurred while creating job descriptor";
+ log.error(msg, e);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason(msg);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ } catch (GFacException e) {
+ String msg = "Error occurred while creating job descriptor";
+ log.error(msg, e);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason(msg);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ } catch (SSHApiException e) {
+ String msg = "Error occurred while submitting the job";
+ log.error(msg, e);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason(msg);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ } catch (IOException e) {
+ String msg = "Error while reading the content of the job file";
+ log.error(msg, e);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason(msg);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ } catch (InterruptedException e) {
+ String msg = "Error occurred while verifying the job submission";
+ log.error(msg, e);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason(msg);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ }
+
+ taskContext.setTaskStatus(taskStatus);
+ try {
+ GFacUtils.saveAndPublishTaskStatus(taskContext);
+ } catch (GFacException e) {
+ log.error("Error while saving task status", e);
+ }
+ return taskStatus;
+ }
+
+ private boolean verifyJobSubmissionByJobId(RemoteCluster remoteCluster, String jobID) throws SSHApiException {
+ JobStatus status = remoteCluster.getJobStatus(jobID);
+ return status != null && status.getJobState() != JobState.UNKNOWN;
+ }
+
+ private String verifyJobSubmission(RemoteCluster remoteCluster, JobModel jobDetails) {
+ String jobName = jobDetails.getJobName();
+ String jobId = null;
+ try {
+ jobId = remoteCluster.getJobIdByJobName(jobName, remoteCluster.getServerInfo().getUserName());
+ } catch (SSHApiException e) {
+ log.error("Error while verifying JobId from JobName");
+ }
+ return jobId;
+ }
+
+
+ @Override
+ public TaskStatus recover(TaskContext taskContext) {
+ ProcessContext processContext = taskContext.getParentProcessContext();
+ JobModel jobModel = processContext.getJobModel();
+ // original job failed before submitting
+ if (jobModel == null || jobModel.getJobId() == null ){
+ return execute(taskContext);
+ }else {
+ // job is already submitted and monitor should handle the recovery
+ return new TaskStatus(TaskState.COMPLETED);
+ }
+ }
+
+ @Override
+ public TaskTypes getType() {
+ return TaskTypes.JOB_SUBMISSION;
+ }
+
+ @Override
+ public JobStatus cancel(TaskContext taskcontext) throws TaskException {
+ ProcessContext processContext = taskcontext.getParentProcessContext();
+ RemoteCluster remoteCluster = processContext.getRemoteCluster();
+ JobModel jobModel = processContext.getJobModel();
+ int retryCount = 0;
+ if (jobModel != null) {
+ try {
+ JobStatus oldJobStatus = remoteCluster.getJobStatus(jobModel.getJobId());
+ while (oldJobStatus == null && retryCount <= 5) {
+ retryCount++;
+ Thread.sleep(retryCount * 1000);
+ oldJobStatus = remoteCluster.getJobStatus(jobModel.getJobId());
+ }
+ if (oldJobStatus != null) {
+ oldJobStatus = remoteCluster.cancelJob(jobModel.getJobId());
+ return oldJobStatus;
+ } else {
+ throw new TaskException("Cancel operation failed, Job status couldn't find in resource, JobId " +
+ jobModel.getJobId());
+ }
+ } catch (SSHApiException | InterruptedException e) {
+ throw new TaskException("Error while cancelling job " + jobModel.getJobId(), e);
+ }
+ } else {
+ throw new TaskException("Couldn't complete cancel operation, JobModel is null in ProcessContext.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
new file mode 100644
index 0000000..fff130c
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.Task;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class EnvironmentSetupTask implements Task {
+
+ private static final Logger log = LoggerFactory.getLogger(EnvironmentSetupTask.class);
+ @Override
+ public void init(Map<String, String> propertyMap) throws TaskException {
+
+ }
+
+ @Override
+ public TaskStatus execute(TaskContext taskContext) {
+ TaskStatus status = new TaskStatus(TaskState.COMPLETED);
+ try {
+ RemoteCluster remoteCluster = taskContext.getParentProcessContext().getRemoteCluster();
+ remoteCluster.makeDirectory(taskContext.getParentProcessContext().getWorkingDir());
+ status.setReason("Successfully created environment");
+ } catch (SSHApiException e) {
+ String msg = "Error while environment setup";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ }
+ return status;
+ }
+
+ @Override
+ public TaskStatus recover(TaskContext taskContext) {
+ return execute(taskContext);
+ }
+
+ @Override
+ public TaskTypes getType() {
+ return TaskTypes.ENV_SETUP;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
new file mode 100644
index 0000000..ed75fef
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
@@ -0,0 +1,184 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.gfac.core.*;
+import org.apache.airavata.gfac.core.cluster.JobSubmissionOutput;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.JobSubmissionTask;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+public class ForkJobSubmissionTask implements JobSubmissionTask {
+ private static final Logger log = LoggerFactory.getLogger(ForkJobSubmissionTask.class);
+ @Override
+ public void init(Map<String, String> propertyMap) throws TaskException {
+
+ }
+
+ @Override
+ public TaskStatus execute(TaskContext taskContext) {
+ TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
+ try {
+ ProcessContext processContext = taskContext.getParentProcessContext();
+ JobModel jobModel = processContext.getJobModel();
+ jobModel.setTaskId(taskContext.getTaskId());
+ RemoteCluster remoteCluster = processContext.getRemoteCluster();
+ JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext, taskContext);
+ jobModel.setJobName(jobDescriptor.getJobName());
+ ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
+ JobManagerConfiguration jConfig = null;
+ if (resourceJobManager != null) {
+ jConfig = Factory.getJobManagerConfiguration(resourceJobManager);
+ }
+ JobStatus jobStatus = new JobStatus();
+ File jobFile = GFacUtils.createJobFile(taskContext, jobDescriptor, jConfig);
+ if (jobFile != null && jobFile.exists()) {
+ jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
+ JobSubmissionOutput jobSubmissionOutput = remoteCluster.submitBatchJob(jobFile.getPath(),
+ processContext.getWorkingDir());
+ jobModel.setExitCode(jobSubmissionOutput.getExitCode());
+ jobModel.setStdErr(jobSubmissionOutput.getStdErr());
+ jobModel.setStdOut(jobSubmissionOutput.getStdOut());
+ String jobId = jobSubmissionOutput.getJobId();
+ if (jobId != null && !jobId.isEmpty()) {
+ jobModel.setJobId(jobId);
+ GFacUtils.saveJobModel(processContext, jobModel);
+ jobStatus.setJobState(JobState.SUBMITTED);
+ jobStatus.setReason("Successfully Submitted to " + taskContext.getParentProcessContext()
+ .getComputeResourceDescription().getHostName());
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ jobModel.setJobStatus(jobStatus);
+ GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+ taskStatus = new TaskStatus(TaskState.COMPLETED);
+ taskStatus.setReason("Submitted job to compute resource");
+ }
+ if (jobId == null || jobId.isEmpty()) {
+ String msg = "expId:" + processContext.getProcessModel().getExperimentId() + " Couldn't find " +
+ "remote jobId for JobName:" + jobModel.getJobName() + ", both submit and verify steps " +
+ "doesn't return a valid JobId. " + "Hence changing experiment state to Failed";
+ log.error(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(msg);
+ errorModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+ GFacUtils.saveExperimentError(processContext, errorModel);
+ GFacUtils.saveProcessError(processContext, errorModel);
+ GFacUtils.saveTaskError(taskContext, errorModel);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason("Couldn't find job id in both submitted and verified steps");
+ }else {
+ GFacUtils.saveJobModel(processContext, jobModel);
+ }
+ } else {
+ taskStatus.setState(TaskState.FAILED);
+ if (jobFile == null) {
+ taskStatus.setReason("JobFile is null");
+ } else {
+ taskStatus.setReason("Job file doesn't exist");
+ }
+ }
+ } catch (ApplicationSettingsException e) {
+ String msg = "Error occurred while creating job descriptor";
+ log.error(msg, e);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ } catch (AppCatalogException e) {
+ String msg = "Error while instantiating app catalog";
+ log.error(msg, e);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ } catch (GFacException e) {
+ String msg = "Error occurred while creating job descriptor";
+ log.error(msg, e);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ } catch (SSHApiException e) {
+ String msg = "Error occurred while submitting the job";
+ log.error(msg, e);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ } catch (IOException e) {
+ String msg = "Error while reading the content of the job file";
+ log.error(msg, e);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ }
+ return taskStatus;
+ }
+
+ @Override
+ public TaskStatus recover(TaskContext taskContext) {
+ //TODO implement recovery scenario instead of calling execute.
+ return execute(taskContext);
+ }
+
+ @Override
+ public TaskTypes getType() {
+ return TaskTypes.JOB_SUBMISSION;
+ }
+
+ @Override
+ public JobStatus cancel(TaskContext taskcontext) {
+ // TODO - implement cancel with SSH Fork
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
index 32ee31b..678ded1 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
@@ -20,11 +20,32 @@
*/
package org.apache.airavata.gfac.impl.task;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.credential.store.credential.Credential;
+import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.credential.store.store.CredentialStoreException;
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.GFacUtils;
import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication;
+import org.apache.airavata.gfac.core.authentication.SSHPasswordAuthentication;
+import org.apache.airavata.gfac.core.cluster.CommandInfo;
+import org.apache.airavata.gfac.core.cluster.RawCommandInfo;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.cluster.ServerInfo;
import org.apache.airavata.gfac.core.context.TaskContext;
import org.apache.airavata.gfac.core.task.Task;
import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.gfac.impl.SSHUtils;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.commons.ErrorModel;
import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.status.TaskState;
@@ -35,81 +56,240 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
+/**
+ * This will be used for both Input file staging and output file staging, hence if you do any changes to a part of logic
+ * in this class please consider that will works with both input and output cases.
+ */
public class SCPDataStageTask implements Task {
- private static final Logger log = LoggerFactory.getLogger(SCPDataStageTask.class);
-
- @Override
- public void init(Map<String, String> propertyMap) throws TaskException {
-
- }
-
- @Override
- public TaskStatus execute(TaskContext taskContext) {
- TaskStatus status = new TaskStatus(TaskState.COMPLETED);
- if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) {
- status.setState(TaskState.FAILED);
- status.setReason("Invalid task call, expected " + TaskTypes.DATA_STAGING.toString() + " but found "
- + taskContext.getTaskModel().getTaskType().toString());
- } else {
- try {
- DataStagingTaskModel subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel(taskContext
- .getTaskModel());
- URI sourceURI = new URI(subTaskModel.getSource());
- URI destinationURI = new URI(subTaskModel.getDestination());
-
- ProcessState processState = taskContext.getParentProcessContext().getProcessState();
- if (processState == ProcessState.INPUT_DATA_STAGING) {
- /**
- * copy local file to compute resource.
- */
- taskContext.getParentProcessContext().getRemoteCluster().copyTo(sourceURI.getPath(), destinationURI
- .getPath());
- } else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
- /**
- * copy remote file from compute resource.
- */
- taskContext.getParentProcessContext().getRemoteCluster().copyFrom(sourceURI.getPath(), destinationURI
- .getPath());
- }
- status.setReason("Successfully staged data");
- } catch (SSHApiException e) {
- String msg = "Scp attempt failed";
- log.error(msg, e);
- status.setState(TaskState.FAILED);
- status.setReason(msg);
- ErrorModel errorModel = new ErrorModel();
- errorModel.setActualErrorMessage(e.getMessage());
- errorModel.setUserFriendlyMessage(msg);
- taskContext.getTaskModel().setTaskError(errorModel);
- } catch (TException e) {
- String msg = "Invalid task invocation";
- log.error(msg, e);
- status.setState(TaskState.FAILED);
- status.setReason(msg);
- ErrorModel errorModel = new ErrorModel();
- errorModel.setActualErrorMessage(e.getMessage());
- errorModel.setUserFriendlyMessage(msg);
- taskContext.getTaskModel().setTaskError(errorModel);
- } catch (URISyntaxException e) {
- String msg = "source or destination is not a valid URI";
- log.error(msg, e);
- status.setState(TaskState.FAILED);
- status.setReason(msg);
- ErrorModel errorModel = new ErrorModel();
- errorModel.setActualErrorMessage(e.getMessage());
- errorModel.setUserFriendlyMessage(msg);
- taskContext.getTaskModel().setTaskError(errorModel);
- }
- }
- return status;
- }
-
- @Override
- public TaskStatus recover(TaskContext taskContext) {
+ private static final Logger log = LoggerFactory.getLogger(SCPDataStageTask.class);
+ private static final int DEFAULT_SSH_PORT = 22;
+ private String password;
+ private String publicKeyPath;
+ private String passPhrase;
+ private String privateKeyPath;
+ private String userName;
+ private String hostName;
+ private String inputPath;
+
+ @Override
+ public void init(Map<String, String> propertyMap) throws TaskException {
+ inputPath = propertyMap.get("inputPath");
+ hostName = propertyMap.get("hostName");
+ userName = propertyMap.get("userName");
+ }
+
+ @Override
+ public TaskStatus execute(TaskContext taskContext) {
+ TaskStatus status = new TaskStatus(TaskState.EXECUTING);
+ AuthenticationInfo authenticationInfo = null;
+ DataStagingTaskModel subTaskModel = null;
+ String localDataDir = null;
+ ProcessState processState = taskContext.getParentProcessContext().getProcessState();
+ try {
+ subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel
+ (taskContext.getTaskModel());
+ if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+ OutputDataObjectType processOutput = taskContext.getProcessOutput();
+ if (processOutput != null && processOutput.getValue() == null) {
+ log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
+ taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+ processOutput.getName());
+ status = new TaskStatus(TaskState.FAILED);
+ if (processOutput.isIsRequired()) {
+ status.setReason("File name is null, but this output's isRequired bit is not set");
+ } else {
+ status.setReason("File name is null");
+ }
+ return status;
+ }
+ } else if (processState == ProcessState.INPUT_DATA_STAGING) {
+ InputDataObjectType processInput = taskContext.getProcessInput();
+ if (processInput != null && processInput.getValue() == null) {
+ log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
+ taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+ processInput.getName());
+ status = new TaskStatus(TaskState.FAILED);
+ if (processInput.isIsRequired()) {
+ status.setReason("File name is null, but this input's isRequired bit is not set");
+ } else {
+ status.setReason("File name is null");
+ }
+ return status;
+ }
+ } else {
+ status.setState(TaskState.FAILED);
+ status.setReason("Invalid task invocation, Support " + ProcessState.INPUT_DATA_STAGING.name() + " and " +
+ "" + ProcessState.OUTPUT_DATA_STAGING.name() + " process phases. found " + processState.name());
+ return status;
+ }
+
+ // use rsync instead of scp if source and destination host and user name is same.
+ URI sourceURI = new URI(subTaskModel.getSource());
+ String fileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1,
+ sourceURI.getPath().length());
+ URI destinationURI = null;
+ if (subTaskModel.getDestination().startsWith("dummy")) {
+ destinationURI = getDestinationURI(taskContext, fileName);
+ subTaskModel.setDestination(destinationURI.toString());
+ } else {
+ destinationURI = new URI(subTaskModel.getDestination());
+ }
+
+ if (sourceURI.getHost().equalsIgnoreCase(destinationURI.getHost())
+ && sourceURI.getUserInfo().equalsIgnoreCase(destinationURI.getUserInfo())) {
+ localDataCopy(taskContext, sourceURI, destinationURI);
+ status.setState(TaskState.COMPLETED);
+ status.setReason("Locally copied file using 'cp' command ");
+ return status;
+ }
+
+
+ String tokenId = taskContext.getParentProcessContext().getTokenId();
+ CredentialReader credentialReader = GFacUtils.getCredentialReader();
+ Credential credential = credentialReader.getCredential(taskContext.getParentProcessContext().getGatewayId(), tokenId);
+ if (credential instanceof SSHCredential) {
+ SSHCredential sshCredential = (SSHCredential) credential;
+ byte[] publicKey = sshCredential.getPublicKey();
+ publicKeyPath = writeFileToDisk(publicKey);
+ byte[] privateKey = sshCredential.getPrivateKey();
+ privateKeyPath = writeFileToDisk(privateKey);
+ passPhrase = sshCredential.getPassphrase();
+// userName = sshCredential.getPortalUserName(); // this might not same as login user name
+ authenticationInfo = getSSHKeyAuthentication();
+ } else {
+ String msg = "Provided credential store token is not valid. Please provide the correct credential store token";
+ log.error(msg);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(msg);
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ return status;
+ }
+ status = new TaskStatus(TaskState.COMPLETED);
+
+ ServerInfo serverInfo = new ServerInfo(userName, hostName, DEFAULT_SSH_PORT);
+ Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
+ if (processState == ProcessState.INPUT_DATA_STAGING) {
+ inputDataStaging(taskContext, sshSession, sourceURI, destinationURI);
+ status.setReason("Successfully staged input data");
+ } else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+ String targetPath = destinationURI.getPath().substring(0, destinationURI.getPath().lastIndexOf('/'));
+ SSHUtils.makeDirectory(targetPath, sshSession);
+ // TODO - save updated subtask model with new destination
+ outputDataStaging(taskContext, sshSession, sourceURI, destinationURI);
+ status.setReason("Successfully staged output data");
+ }
+ } catch (TException e) {
+ String msg = "Couldn't create subTask model thrift model";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ return status;
+ } catch (ApplicationSettingsException | FileNotFoundException | CredentialStoreException | IllegalAccessException | InstantiationException e) {
+ String msg = "Failed while reading credentials";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ } catch (URISyntaxException e) {
+ String msg = "Sorce or destination uri is not correct source : " + subTaskModel.getSource() + ", " +
+ "destination : " + subTaskModel.getDestination();
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ } catch (SSHApiException e) {
+ String msg = "Failed to do scp with compute resource";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ } catch (AiravataException e) {
+ String msg = "Error while creating ssh session with client";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ } catch (JSchException | IOException e) {
+ String msg = "Failed to do scp with client";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ } catch (GFacException e) {
+ String msg = "Failed update experiment and process inputs and outputs";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ }
+ return status;
+ }
+
+ private void localDataCopy(TaskContext taskContext, URI sourceURI, URI destinationURI) throws SSHApiException {
+ StringBuilder sb = new StringBuilder("rsync -cr ");
+ sb.append(sourceURI.getPath()).append(" ").append(destinationURI.getPath());
+ CommandInfo commandInfo = new RawCommandInfo(sb.toString());
+ taskContext.getParentProcessContext().getRemoteCluster().execute(commandInfo);
+ }
+
+ private void inputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI
+ destinationURI) throws SSHApiException, IOException, JSchException {
+ /**
+ * scp third party file transfer 'to' compute resource.
+ */
+ taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(),
+ destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.TO);
+ }
+
+ private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI destinationURI)
+ throws SSHApiException, AiravataException, IOException, JSchException, GFacException {
+
+ /**
+ * scp third party file transfer 'from' comute resource.
+ */
+ taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(),
+ destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.FROM);
+ // update output locations
+ GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath());
+ GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath());
+
+ }
+
+ @Override
+ public TaskStatus recover(TaskContext taskContext) {
TaskState state = taskContext.getTaskStatus().getState();
if (state == TaskState.EXECUTING || state == TaskState.CREATED) {
return execute(taskContext);
@@ -117,10 +297,45 @@ public class SCPDataStageTask implements Task {
// files already transferred or failed
return taskContext.getTaskStatus();
}
- }
+ }
+
+ @Override
+ public TaskTypes getType() {
+ return TaskTypes.DATA_STAGING;
+ }
+
+ private SSHPasswordAuthentication getSSHPasswordAuthentication() {
+ return new SSHPasswordAuthentication(userName, password);
+ }
+
+ private SSHKeyAuthentication getSSHKeyAuthentication() {
+ SSHKeyAuthentication sshKA = new SSHKeyAuthentication();
+ sshKA.setUserName(userName);
+ sshKA.setPassphrase(passPhrase);
+ sshKA.setPrivateKeyFilePath(privateKeyPath);
+ sshKA.setPublicKeyFilePath(publicKeyPath);
+ sshKA.setStrictHostKeyChecking("no");
+ return sshKA;
+ }
+
+ private String writeFileToDisk(byte[] data) {
+ File temp = null;
+ try {
+ temp = File.createTempFile("id_rsa", "");
+ //write it
+ FileOutputStream bw = new FileOutputStream(temp);
+ bw.write(data);
+ bw.close();
+ } catch (IOException e) {
+ log.error(e.getMessage(), e);
+ }
+ return temp.getAbsolutePath();
+ }
+
+ public URI getDestinationURI(TaskContext taskContext, String fileName) throws URISyntaxException {
+ String filePath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator) +
+ taskContext.getParentProcessContext().getProcessId() + File.separator + fileName;
+ return new URI("SCP", hostName, filePath, null);
- @Override
- public TaskTypes getType() {
- return TaskTypes.DATA_STAGING;
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
deleted file mode 100644
index d28ae3f..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.airavata.gfac.impl.task;
-
-import org.apache.airavata.gfac.core.SSHApiException;
-import org.apache.airavata.gfac.core.cluster.RemoteCluster;
-import org.apache.airavata.gfac.core.context.TaskContext;
-import org.apache.airavata.gfac.core.task.Task;
-import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.model.commons.ErrorModel;
-import org.apache.airavata.model.status.TaskState;
-import org.apache.airavata.model.status.TaskStatus;
-import org.apache.airavata.model.task.TaskTypes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-public class SSHEnvironmentSetupTask implements Task {
-
- private static final Logger log = LoggerFactory.getLogger(SSHEnvironmentSetupTask.class);
- @Override
- public void init(Map<String, String> propertyMap) throws TaskException {
-
- }
-
- @Override
- public TaskStatus execute(TaskContext taskContext) {
- TaskStatus status = new TaskStatus(TaskState.COMPLETED);
- try {
- RemoteCluster remoteCluster = taskContext.getParentProcessContext().getRemoteCluster();
- remoteCluster.makeDirectory(taskContext.getParentProcessContext().getWorkingDir());
- status.setReason("Successfully created environment");
- } catch (SSHApiException e) {
- String msg = "Error while environment setup";
- log.error(msg, e);
- status.setState(TaskState.FAILED);
- status.setReason(msg);
- ErrorModel errorModel = new ErrorModel();
- errorModel.setActualErrorMessage(e.getMessage());
- errorModel.setUserFriendlyMessage(msg);
- taskContext.getTaskModel().setTaskError(errorModel);
- }
- return status;
- }
-
- @Override
- public TaskStatus recover(TaskContext taskContext) {
- return execute(taskContext);
- }
-
- @Override
- public TaskTypes getType() {
- return TaskTypes.ENV_SETUP;
- }
-}