You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/10/05 22:38:06 UTC
airavata git commit: Set sub task mode destination and source with
correct URI string
Repository: airavata
Updated Branches:
refs/heads/master 4d07c015b -> 808ff55bd
Set sub task mode destination and source with correct URI string
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/808ff55b
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/808ff55b
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/808ff55b
Branch: refs/heads/master
Commit: 808ff55bd1cd79b667077166bbaa9ab0314934f0
Parents: 4d07c01
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Mon Oct 5 16:37:58 2015 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Mon Oct 5 16:37:58 2015 -0400
----------------------------------------------------------------------
.../gfac/core/context/ProcessContext.java | 13 +-
.../org/apache/airavata/gfac/impl/Factory.java | 1 +
.../airavata/gfac/impl/GFacEngineImpl.java | 34 ++-
.../impl/task/AdvancedSCPDataStageTask.java | 256 ++++++++++---------
4 files changed, 173 insertions(+), 131 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/808ff55b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index 5a773e7..8655142 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -23,6 +23,7 @@ package org.apache.airavata.gfac.core.context;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.cluster.ServerInfo;
import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
@@ -79,7 +80,9 @@ public class ProcessContext {
private ResourceJobManager resourceJobManager;
private boolean handOver;
private boolean cancel;
- /**
+ private ServerInfo serverInfo;
+
+ /**
* Note: process context property use lazy loading approach. In runtime you will see some properties as null
* unless you have access it previously. Once that property access using the api,it will be set to correct value.
*/
@@ -361,4 +364,12 @@ public class ProcessContext {
public boolean isInterrupted(){
return this.cancel || this.handOver;
}
+
+ public void setServerInfo(ServerInfo serverInfo) {
+ this.serverInfo = serverInfo;
+ }
+
+ public ServerInfo getServerInfo() {
+ return serverInfo;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/808ff55b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index fbba17b..e60e6eb 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -210,6 +210,7 @@ public abstract class Factory {
if (remoteCluster == null) {
String hostName = Factory.getDefaultAppCatalog().getComputeResource().getComputeResource(computeResourceId).getHostName();
ServerInfo serverInfo = new ServerInfo(processContext.getComputeResourcePreference().getLoginUserName(), hostName);
+ processContext.setServerInfo(serverInfo);
JobManagerConfiguration jobManagerConfiguration = getJobManagerConfiguration(processContext.getResourceJobManager());
AuthenticationInfo authenticationInfo = getSSHKeyAuthentication();
remoteCluster = new HPCRemoteCluster(serverInfo, jobManagerConfiguration, authenticationInfo);
http://git-wip-us.apache.org/repos/asf/airavata/blob/808ff55b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index b2be6be..a7f6b32 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -25,9 +25,11 @@ import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.gfac.core.GFacConstants;
import org.apache.airavata.gfac.core.GFacEngine;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.core.cluster.ServerInfo;
import org.apache.airavata.gfac.core.context.ProcessContext;
import org.apache.airavata.gfac.core.context.TaskContext;
import org.apache.airavata.gfac.core.monitor.JobMonitor;
@@ -68,6 +70,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
@@ -251,7 +255,7 @@ public class GFacEngineImpl implements GFacEngine {
case URI:
try {
taskCtx = getDataStagingTaskContext(processContext, processInput);
- } catch (TException e) {
+ } catch (TException | TaskException e) {
throw new GFacException("Error while serializing data staging sub task model");
}
saveTaskModel(taskCtx);
@@ -442,7 +446,7 @@ public class GFacEngineImpl implements GFacEngine {
case URI: case STDERR: case STDOUT:
try {
taskCtx = getDataStagingTaskContext(processContext, processOutput);
- } catch (TException e) {
+ } catch (TException | TaskException e) {
throw new GFacException("Thrift model to byte[] conversion issue", e);
}
saveTaskModel(taskCtx);
@@ -557,7 +561,7 @@ public class GFacEngineImpl implements GFacEngine {
}
private TaskContext getDataStagingTaskContext(ProcessContext processContext, InputDataObjectType processInput)
- throws TException {
+ throws TException, TaskException {
TaskContext taskCtx = new TaskContext();
taskCtx.setParentProcessContext(processContext);
// create new task model for this task
@@ -572,7 +576,15 @@ public class GFacEngineImpl implements GFacEngine {
// create data staging sub task model
DataStagingTaskModel submodel = new DataStagingTaskModel();
submodel.setSource(processInput.getValue());
- submodel.setDestination(processContext.getDataMovementProtocol().name() + ":" + processContext.getWorkingDir());
+ ServerInfo serverInfo = processContext.getServerInfo();
+ URI destination = null;
+ try {
+ destination = new URI(processContext.getDataMovementProtocol().name(), serverInfo.getHost(),
+ serverInfo.getUserName(), serverInfo.getPort(), processContext.getWorkingDir(), null, null);
+ } catch (URISyntaxException e) {
+ throw new TaskException("Error while constructing destination file URI");
+ }
+ submodel.setDestination(destination.toString());
taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
taskCtx.setTaskModel(taskModel);
taskCtx.setProcessInput(processInput);
@@ -580,7 +592,7 @@ public class GFacEngineImpl implements GFacEngine {
}
private TaskContext getDataStagingTaskContext(ProcessContext processContext, OutputDataObjectType processOutput)
- throws TException {
+ throws TException, TaskException {
TaskContext taskCtx = new TaskContext();
taskCtx.setParentProcessContext(processContext);
// create new task model for this task
@@ -596,8 +608,16 @@ public class GFacEngineImpl implements GFacEngine {
String remoteOutputDir = processContext.getOutputDir();
remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir : remoteOutputDir + "/";
DataStagingTaskModel submodel = new DataStagingTaskModel();
- submodel.setSource(processContext.getDataMovementProtocol().name() + ":" + remoteOutputDir + processOutput
- .getValue());
+ ServerInfo serverInfo = processContext.getServerInfo();
+ URI source = null;
+ try {
+ source = new URI(processContext.getDataMovementProtocol().name(), serverInfo.getHost(),
+ serverInfo.getUserName(), serverInfo.getPort(), remoteOutputDir + processOutput.getValue(), null, null);
+ } catch (URISyntaxException e) {
+ throw new TaskException("Error while constructing source file URI");
+ }
+ submodel.setSource(source.toString());
+ // TODO after thridpary scp implemented we can fix following destination location correct one.
String localWorkingDir = processContext.getLocalWorkingDir();
submodel.setDestination("file://" + localWorkingDir);
taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
http://git-wip-us.apache.org/repos/asf/airavata/blob/808ff55b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
index 1c72fab..269ce10 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
@@ -70,68 +70,86 @@ import java.util.Map;
* This will be used for both Input file staging and output file staging, hence if you do any changes to a part of logic
* in this class please consider that will works with both input and output cases.
*/
-public class AdvancedSCPDataStageTask implements Task{
- private static final Logger log = LoggerFactory.getLogger(AdvancedSCPDataStageTask.class);
- private static final int DEFAULT_SSH_PORT = 22;
- private String password;
- private String publicKeyPath;
- private String passPhrase;
- private String privateKeyPath;
- private String userName;
- private String hostName;
- private String inputPath;
+public class AdvancedSCPDataStageTask implements Task {
+ private static final Logger log = LoggerFactory.getLogger(AdvancedSCPDataStageTask.class);
+ private static final int DEFAULT_SSH_PORT = 22;
+ private String password;
+ private String publicKeyPath;
+ private String passPhrase;
+ private String privateKeyPath;
+ private String userName;
+ private String hostName;
+ private String inputPath;
- @Override
- public void init(Map<String, String> propertyMap) throws TaskException {
- inputPath = propertyMap.get("inputPath");
+ @Override
+ public void init(Map<String, String> propertyMap) throws TaskException {
+ inputPath = propertyMap.get("inputPath");
hostName = propertyMap.get("hostName");
userName = propertyMap.get("userName");
- }
+ }
- @Override
- public TaskStatus execute(TaskContext taskContext) {
+ @Override
+ public TaskStatus execute(TaskContext taskContext) {
TaskStatus status = new TaskStatus(TaskState.CREATED);
AuthenticationInfo authenticationInfo = null;
DataStagingTaskModel subTaskModel = null;
- ProcessState processState = taskContext.getParentProcessContext().getProcessState();
- if (processState == ProcessState.OUTPUT_DATA_STAGING) {
- OutputDataObjectType processOutput = taskContext.getProcessOutput();
- if (processOutput != null && processOutput.getValue() == null) {
- log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
- taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
- processOutput.getName());
- status = new TaskStatus(TaskState.FAILED);
- if (processOutput.isIsRequired()) {
- status.setReason("File name is null, but this output's isRequired bit is not set");
- } else {
- status.setReason("File name is null");
- }
- return status;
- }
- } else if (processState == ProcessState.INPUT_DATA_STAGING) {
- InputDataObjectType processInput = taskContext.getProcessInput();
- if (processInput != null && processInput.getValue() == null) {
- log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
- taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
- processInput.getName());
- status = new TaskStatus(TaskState.FAILED);
- if (processInput.isIsRequired()) {
- status.setReason("File name is null, but this input's isRequired bit is not set");
- } else {
- status.setReason("File name is null");
- }
- return status;
- }
- } else {
- status.setState(TaskState.FAILED);
- status.setReason("Invalid task invocation, Support " + ProcessState.INPUT_DATA_STAGING.name() + " and " +
- "" + ProcessState.OUTPUT_DATA_STAGING.name() + " process phases. found " + processState.name());
- }
+ String localDataDir = null;
+ ProcessState processState = taskContext.getParentProcessContext().getProcessState();
+ try {
+ subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel
+ (taskContext.getTaskModel());
+ if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+ OutputDataObjectType processOutput = taskContext.getProcessOutput();
+ if (processOutput != null && processOutput.getValue() == null) {
+ log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
+ taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+ processOutput.getName());
+ status = new TaskStatus(TaskState.FAILED);
+ if (processOutput.isIsRequired()) {
+ status.setReason("File name is null, but this output's isRequired bit is not set");
+ } else {
+ status.setReason("File name is null");
+ }
+ localDataDir = subTaskModel.getDestination();
+ return status;
+ }
+ } else if (processState == ProcessState.INPUT_DATA_STAGING) {
+ InputDataObjectType processInput = taskContext.getProcessInput();
+ if (processInput != null && processInput.getValue() == null) {
+ log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
+ taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+ processInput.getName());
+ status = new TaskStatus(TaskState.FAILED);
+ if (processInput.isIsRequired()) {
+ status.setReason("File name is null, but this input's isRequired bit is not set");
+ } else {
+ status.setReason("File name is null");
+ }
+ return status;
+ }
+ } else {
+ status.setState(TaskState.FAILED);
+ status.setReason("Invalid task invocation, Support " + ProcessState.INPUT_DATA_STAGING.name() + " and " +
+ "" + ProcessState.OUTPUT_DATA_STAGING.name() + " process phases. found " + processState.name());
+ }
- try {
// use cp instead of scp if source and destination host and user name is same.
URI sourceURI = new URI(subTaskModel.getSource());
- URI destinationURI = new URI(subTaskModel.getDestination());
+ String fileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1,
+ sourceURI.getPath().length());
+ String targetPath = null;
+ String targetFilePath = null;
+ URI destinationURI = null;
+ if (localDataDir != null) {
+ targetPath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator) +
+ taskContext.getParentProcessContext().getProcessId();
+ targetFilePath = targetPath + File.separator + fileName;
+ destinationURI = new URI("SCP", hostName, targetFilePath, null);
+ subTaskModel.setDestination(destinationURI.toString());
+
+ } else {
+ destinationURI = new URI(subTaskModel.getDestination());
+ }
if (sourceURI.getHost().equalsIgnoreCase(destinationURI.getHost())
&& sourceURI.getUserInfo().equalsIgnoreCase(destinationURI.getUserInfo())) {
@@ -145,8 +163,8 @@ public class AdvancedSCPDataStageTask implements Task{
String tokenId = taskContext.getParentProcessContext().getTokenId();
CredentialReader credentialReader = GFacUtils.getCredentialReader();
Credential credential = credentialReader.getCredential(taskContext.getParentProcessContext().getGatewayId(), tokenId);
- if (credential instanceof SSHCredential){
- SSHCredential sshCredential = (SSHCredential)credential;
+ if (credential instanceof SSHCredential) {
+ SSHCredential sshCredential = (SSHCredential) credential;
byte[] publicKey = sshCredential.getPublicKey();
publicKeyPath = writeFileToDisk(publicKey);
byte[] privateKey = sshCredential.getPrivateKey();
@@ -154,7 +172,7 @@ public class AdvancedSCPDataStageTask implements Task{
passPhrase = sshCredential.getPassphrase();
// userName = sshCredential.getPortalUserName(); // this might not same as login user name
authenticationInfo = getSSHKeyAuthentication();
- }else {
+ } else {
String msg = "Provided credential store token is not valid. Please provide the correct credential store token";
log.error(msg);
status.setState(TaskState.FAILED);
@@ -166,8 +184,7 @@ public class AdvancedSCPDataStageTask implements Task{
return status;
}
status = new TaskStatus(TaskState.COMPLETED);
- subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel
- (taskContext.getTaskModel());
+
File templocalDataDir = GFacUtils.getLocalDataDir(taskContext);
if (!templocalDataDir.exists()) {
@@ -176,37 +193,30 @@ public class AdvancedSCPDataStageTask implements Task{
}
}
- String fileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1,
- sourceURI.getPath().length());
String filePath = templocalDataDir + File.separator + fileName;
ServerInfo serverInfo = new ServerInfo(userName, hostName, DEFAULT_SSH_PORT);
Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
- if (processState == ProcessState.INPUT_DATA_STAGING) {
+ if (processState == ProcessState.INPUT_DATA_STAGING) {
inputDataStaging(taskContext, sshSession, sourceURI, destinationURI, filePath);
status.setReason("Successfully staged input data");
- }else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
- String targetPath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator) +
- taskContext.getParentProcessContext().getProcessId();
- SSHUtils.makeDirectory(targetPath, sshSession);
- String targetFilePath = targetPath + File.separator + fileName;
- destinationURI = new URI("SCP", hostName, targetFilePath, null);
- subTaskModel.setDestination(destinationURI.getPath());
- // TODO - save updated subtask model with new destination
- outputDataStaging(taskContext, sshSession, sourceURI, destinationURI, filePath);
- status.setReason("Successfully staged output data");
- }
- } catch (TException e) {
- String msg = "Couldn't create subTask model thrift model";
- log.error(msg, e);
- status.setState(TaskState.FAILED);
- status.setReason(msg);
- ErrorModel errorModel = new ErrorModel();
- errorModel.setActualErrorMessage(e.getMessage());
- errorModel.setUserFriendlyMessage(msg);
- taskContext.getTaskModel().setTaskError(errorModel);
- return status;
- } catch (ApplicationSettingsException | FileNotFoundException | CredentialStoreException | IllegalAccessException | InstantiationException e) {
+ } else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+ SSHUtils.makeDirectory(targetPath, sshSession);
+ // TODO - save updated subtask model with new destination
+ outputDataStaging(taskContext, sshSession, sourceURI, destinationURI, filePath);
+ status.setReason("Successfully staged output data");
+ }
+ } catch (TException e) {
+ String msg = "Couldn't create subTask model thrift model";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ return status;
+ } catch (ApplicationSettingsException | FileNotFoundException | CredentialStoreException | IllegalAccessException | InstantiationException e) {
String msg = "Failed while reading credentials";
log.error(msg, e);
status.setState(TaskState.FAILED);
@@ -243,7 +253,7 @@ public class AdvancedSCPDataStageTask implements Task{
errorModel.setActualErrorMessage(e.getMessage());
errorModel.setUserFriendlyMessage(msg);
taskContext.getTaskModel().setTaskError(errorModel);
- } catch (JSchException | IOException e ) {
+ } catch (JSchException | IOException e) {
String msg = "Failed to do scp with client";
log.error(msg, e);
status.setState(TaskState.FAILED);
@@ -263,7 +273,7 @@ public class AdvancedSCPDataStageTask implements Task{
taskContext.getTaskModel().setTaskError(errorModel);
}
return status;
- }
+ }
private void localDataCopy(TaskContext taskContext, URI sourceURI, URI destinationURI) throws SSHApiException {
StringBuilder sb = new StringBuilder("rsync -cr ");
@@ -272,38 +282,38 @@ public class AdvancedSCPDataStageTask implements Task{
taskContext.getParentProcessContext().getRemoteCluster().execute(commandInfo);
}
- private void inputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI
- destinationURI, String filePath) throws SSHApiException, IOException, JSchException {
- /**
- * scp remote client file to airavata local dir.
- */
- SSHUtils.scpFrom(sourceURI.getPath(), filePath, sshSession);
+ private void inputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI
+ destinationURI, String filePath) throws SSHApiException, IOException, JSchException {
+ /**
+ * scp remote client file to airavata local dir.
+ */
+ SSHUtils.scpFrom(sourceURI.getPath(), filePath, sshSession);
- /**
- * scp local file to compute resource.
- */
- taskContext.getParentProcessContext().getRemoteCluster().scpTo(filePath, destinationURI.getPath());
- }
+ /**
+ * scp local file to compute resource.
+ */
+ taskContext.getParentProcessContext().getRemoteCluster().scpTo(filePath, destinationURI.getPath());
+ }
- private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI destinationURI,
- String filePath) throws SSHApiException, AiravataException, IOException, JSchException, GFacException {
- /**
- * scp remote file from comute resource to airavata local
- */
- taskContext.getParentProcessContext().getRemoteCluster().scpFrom(sourceURI.getPath(), filePath);
+ private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI destinationURI,
+ String filePath) throws SSHApiException, AiravataException, IOException, JSchException, GFacException {
+ /**
+ * scp remote file from comute resource to airavata local
+ */
+ taskContext.getParentProcessContext().getRemoteCluster().scpFrom(sourceURI.getPath(), filePath);
- /**
- * scp local file to remote client
- */
- SSHUtils.scpTo(filePath, destinationURI.getPath(), sshSession);
+ /**
+ * scp local file to remote client
+ */
+ SSHUtils.scpTo(filePath, destinationURI.getPath(), sshSession);
// update output locations
GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath());
GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath());
}
- @Override
- public TaskStatus recover(TaskContext taskContext) {
+ @Override
+ public TaskStatus recover(TaskContext taskContext) {
TaskState state = taskContext.getTaskStatus().getState();
if (state == TaskState.EXECUTING || state == TaskState.CREATED) {
return execute(taskContext);
@@ -313,24 +323,24 @@ public class AdvancedSCPDataStageTask implements Task{
}
}
- @Override
- public TaskTypes getType() {
- return TaskTypes.DATA_STAGING;
- }
+ @Override
+ public TaskTypes getType() {
+ return TaskTypes.DATA_STAGING;
+ }
- private SSHPasswordAuthentication getSSHPasswordAuthentication() {
- return new SSHPasswordAuthentication(userName, password);
- }
+ private SSHPasswordAuthentication getSSHPasswordAuthentication() {
+ return new SSHPasswordAuthentication(userName, password);
+ }
- private SSHKeyAuthentication getSSHKeyAuthentication(){
- SSHKeyAuthentication sshKA = new SSHKeyAuthentication();
- sshKA.setUserName(userName);
- sshKA.setPassphrase(passPhrase);
- sshKA.setPrivateKeyFilePath(privateKeyPath);
- sshKA.setPublicKeyFilePath(publicKeyPath);
- sshKA.setStrictHostKeyChecking("no");
- return sshKA;
- }
+ private SSHKeyAuthentication getSSHKeyAuthentication() {
+ SSHKeyAuthentication sshKA = new SSHKeyAuthentication();
+ sshKA.setUserName(userName);
+ sshKA.setPassphrase(passPhrase);
+ sshKA.setPrivateKeyFilePath(privateKeyPath);
+ sshKA.setPublicKeyFilePath(publicKeyPath);
+ sshKA.setStrictHostKeyChecking("no");
+ return sshKA;
+ }
private String writeFileToDisk(byte[] data) {
File temp = null;