You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2016/03/11 07:57:24 UTC
[08/16] airavata git commit: Completed ARchiveTask
Completed ARchiveTask
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4e80197d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4e80197d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4e80197d
Branch: refs/heads/master
Commit: 4e80197d71a92da3451c42fe8e166786315f5456
Parents: 75fb3b3
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Mon Mar 7 16:24:33 2016 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Mon Mar 7 16:24:33 2016 -0500
----------------------------------------------------------------------
.../org/apache/airavata/gfac/impl/Factory.java | 5 +
.../airavata/gfac/impl/GFacEngineImpl.java | 19 ++-
.../airavata/gfac/impl/task/ArchiveTask.java | 161 ++++++++++++++++++-
.../cpi/impl/SimpleOrchestratorImpl.java | 62 ++++---
4 files changed, 216 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/4e80197d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index 0b70b0b..7b01ffe 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -53,6 +53,7 @@ import org.apache.airavata.gfac.core.task.Task;
import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher;
import org.apache.airavata.gfac.core.watcher.RedeliveryRequestWatcher;
import org.apache.airavata.gfac.impl.job.*;
+import org.apache.airavata.gfac.impl.task.ArchiveTask;
import org.apache.airavata.gfac.impl.watcher.CancelRequestWatcherImpl;
import org.apache.airavata.gfac.impl.watcher.RedeliveryRequestWatcherImpl;
import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
@@ -449,6 +450,10 @@ public abstract class Factory {
}
+ public static Task getArchiveTask() {
+ return new ArchiveTask();
+ }
+
private static class DefaultUserInfo implements UserInfo {
private String userName;
http://git-wip-us.apache.org/repos/asf/airavata/blob/4e80197d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 5e7774f..0622e58 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -296,10 +296,15 @@ public class GFacEngineImpl implements GFacEngine {
processContext.setProcessStatus(status);
GFacUtils.saveAndPublishProcessStatus(processContext);
taskContext.setProcessOutput(subTaskModel.getProcessOutput());
- outputDataStaging(taskContext, processContext.isRecovery());
+ outputDataStaging(taskContext, processContext.isRecovery(), false);
break;
case ARCHIVE_OUTPUT:
- // TODO - implement output archive logic
+ status = new ProcessStatus(ProcessState.OUTPUT_DATA_STAGING);
+ status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ processContext.setProcessStatus(status);
+ GFacUtils.saveAndPublishProcessStatus(processContext);
+ outputDataStaging(taskContext, processContext.isRecovery(), true);
+ break;
}
// checkpoint
@@ -668,14 +673,19 @@ public class GFacEngineImpl implements GFacEngine {
* @return <code>true</code> if process execution interrupted , <code>false</code> otherwise.
* @throws GFacException
*/
- private boolean outputDataStaging(TaskContext taskContext, boolean recovery) throws GFacException {
+ private boolean outputDataStaging(TaskContext taskContext, boolean recovery, boolean isArchive) throws GFacException {
TaskStatus taskStatus = new TaskStatus(TaskState.EXECUTING);
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
taskContext.setTaskStatus(taskStatus);
GFacUtils.saveAndPublishTaskStatus(taskContext);
ProcessContext processContext = taskContext.getParentProcessContext();
- Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
+ Task dMoveTask = null;
+ if (isArchive) {
+ dMoveTask = Factory.getArchiveTask();
+ } else {
+ dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
+ }
taskStatus = executeTask(taskContext, dMoveTask, recovery);
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
taskContext.setTaskStatus(taskStatus);
@@ -699,6 +709,7 @@ public class GFacEngineImpl implements GFacEngine {
return false;
}
+
@Override
public void cancelProcess(ProcessContext processContext) throws GFacException {
if (processContext != null) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/4e80197d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
index d51c389..f6988f2 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
@@ -20,16 +20,50 @@
*/
package org.apache.airavata.gfac.impl.task;
+import com.jcraft.jsch.ChannelExec;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.core.cluster.CommandInfo;
+import org.apache.airavata.gfac.core.cluster.CommandOutput;
+import org.apache.airavata.gfac.core.cluster.RawCommandInfo;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.cluster.ServerInfo;
+import org.apache.airavata.gfac.core.context.ProcessContext;
import org.apache.airavata.gfac.core.context.TaskContext;
import org.apache.airavata.gfac.core.task.Task;
import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.gfac.impl.StandardOutReader;
+import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
+import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
+import org.apache.airavata.model.commons.ErrorModel;
import org.apache.airavata.model.status.TaskState;
import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.DataStagingTaskModel;
import org.apache.airavata.model.task.TaskTypes;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.Map;
public class ArchiveTask implements Task {
+
+ private static final Logger log = LoggerFactory.getLogger(ArchiveTask.class);
+ private static final int DEFAULT_SSH_PORT = 22;
+ private String hostName;
+ private String userName;
+ private String inputPath;
+
@Override
public void init(Map<String, String> propertyMap) throws TaskException {
@@ -38,9 +72,86 @@ public class ArchiveTask implements Task {
@Override
public TaskStatus execute(TaskContext taskContext) {
// implement archive logic with jscp
- return new TaskStatus(TaskState.COMPLETED);
+ TaskStatus status = new TaskStatus(TaskState.EXECUTING);
+ ProcessContext processContext = taskContext.getParentProcessContext();
+ RemoteCluster remoteCluster = processContext.getJobSubmissionRemoteCluster();
+ AuthenticationInfo authenticationInfo = null;
+
+ StorageResourceDescription storageResource = taskContext.getParentProcessContext().getStorageResource();
+ StoragePreference storagePreference = taskContext.getParentProcessContext().getStoragePreference();
+
+ if (storageResource != null){
+ hostName = storageResource.getHostName();
+ }
+
+ if (storagePreference != null){
+ userName = storagePreference.getLoginUserName();
+ inputPath = storagePreference.getFileSystemRootLocation();
+ }
+ DataStagingTaskModel subTaskModel = null;
+ try {
+ subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel
+ (taskContext.getTaskModel());
+ } catch (TException e) {
+ String msg = "Error! Deserialization issue with SubTask Model";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ return status;
+ }
+
+ try {
+ authenticationInfo = Factory.getStorageSSHKeyAuthentication(taskContext.getParentProcessContext());
+ status = new TaskStatus(TaskState.COMPLETED);
+
+ ServerInfo serverInfo = new ServerInfo(userName, hostName, DEFAULT_SSH_PORT);
+ Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
+ URI sourceURI = new URI(subTaskModel.getSource());
+ URI destinationURI = null;
+ String workingDirName = null, path = null;
+ if (sourceURI.getPath().endsWith("/")) {
+ path = sourceURI.getPath().substring(0, sourceURI.getPath().length() - 1);
+ } else {
+ path = sourceURI.getPath();
+ }
+ workingDirName = path.substring(path.lastIndexOf(File.separator) + 1, path.length());
+ // tar working dir
+ // cd /Users/syodage/Desktop/temp/.. && tar -cvf path/workingDir.tar temp
+ String archiveTar = workingDirName + ".tar";
+ CommandInfo commandInfo = new RawCommandInfo("cd " + path + "/.. && tar -cvf "
+ + path + "/" + archiveTar + " " + workingDirName);
+
+ // move tar to storage resource
+ path += "/" + archiveTar;
+ remoteCluster.execute(commandInfo);
+ destinationURI = getDestinationURI(taskContext, archiveTar);
+
+ remoteCluster.scpThirdParty(path ,destinationURI.getPath() , sshSession, RemoteCluster.DIRECTION.FROM, true);
+
+ // untar file and delete tar
+ String destPath = destinationURI.getPath();
+ String destParent = destPath.substring(0, destPath.lastIndexOf("/"));
+ commandInfo = new RawCommandInfo("cd " + destParent + " && tar -xvf " + archiveTar + " && rm " + archiveTar);
+ executeCommand(sshSession, commandInfo, new StandardOutReader());
+ } catch (GFacException | AiravataException | URISyntaxException | SSHApiException e) {
+ String msg = "Error! Archive task failed";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ }
+ return status;
}
+
@Override
public TaskStatus recover(TaskContext taskContext) {
return new TaskStatus(TaskState.COMPLETED);
@@ -50,4 +161,52 @@ public class ArchiveTask implements Task {
public TaskTypes getType() {
return TaskTypes.DATA_STAGING;
}
+
+ public URI getDestinationURI(TaskContext taskContext, String fileName) throws URISyntaxException {
+ String experimentDataDir = taskContext.getParentProcessContext().getProcessModel().getExperimentDataDir();
+ String filePath;
+ if(experimentDataDir != null && !experimentDataDir.isEmpty()) {
+ if(experimentDataDir.startsWith(File.separator)){
+ experimentDataDir = experimentDataDir.substring(1);
+ }
+ if(!experimentDataDir.endsWith(File.separator)){
+ experimentDataDir += File.separator;
+ }
+ filePath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator) + experimentDataDir +
+ taskContext.getParentProcessContext().getProcessId() + File.separator + fileName;
+ } else {
+ filePath =(inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator) +
+ taskContext.getParentProcessContext().getProcessId() + File.separator + fileName;
+ }
+ return new URI("SCP", hostName, filePath, null);
+
+ }
+
+
+ private void executeCommand(Session session,CommandInfo commandInfo, CommandOutput commandOutput) throws SSHApiException {
+ String command = commandInfo.getCommand();
+ ChannelExec channelExec = null;
+ try {
+ if (!session.isConnected()) {
+// session = getOpenSession();
+ log.error("Error! client session is closed");
+ throw new JSchException("Error! client session is closed");
+ }
+ channelExec = ((ChannelExec) session.openChannel("exec"));
+ channelExec.setCommand(command);
+ channelExec.setInputStream(null);
+ channelExec.setErrStream(commandOutput.getStandardError());
+ log.info("Executing command {}", commandInfo.getCommand());
+ channelExec.connect();
+ commandOutput.onOutput(channelExec);
+ } catch (JSchException e) {
+ throw new SSHApiException("Unable to execute command - ", e);
+ }finally {
+ //Only disconnecting the channel, session can be reused
+ if (channelExec != null) {
+ commandOutput.exitCode(channelExec.getExitStatus());
+ channelExec.disconnect();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/4e80197d/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index 20471df..ffcb89a 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -413,9 +413,27 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
}
}
}
+
+ if (processModel.isArchive()) {
+ createArchiveDataStatgingTask(processModel, gatewayId, dataStagingTaskIds);
+ }
return dataStagingTaskIds;
}
+ private void createArchiveDataStatgingTask(ProcessModel processModel, String gatewayId, List<String> dataStagingTaskIds) throws RegistryException {
+ TaskModel archiveTask = null;
+ try {
+ archiveTask = getOutputDataStagingTask(processModel, null, gatewayId);
+ } catch (TException e) {
+ throw new RegistryException("Error! DataStaging sub task serialization failed");
+ }
+ String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, archiveTask,
+ processModel.getProcessId());
+ archiveTask.setTaskId(taskId);
+ dataStagingTaskIds.add(archiveTask.getTaskId());
+
+ }
+
private void createOutputDataSatagingTasks(ProcessModel processModel, String gatewayId, List<String> dataStagingTaskIds, OutputDataObjectType processOutput) throws RegistryException {
try {
TaskModel outputDataStagingTask = getOutputDataStagingTask(processModel, processOutput, gatewayId);
@@ -545,16 +563,26 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
String remoteOutputDir = computeResourcePreference.getScratchLocation() + File.separator + processModel.getProcessId();
remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir : remoteOutputDir + "/";
DataStagingTaskModel submodel = new DataStagingTaskModel();
- submodel.setType(DataStageType.OUPUT);
- submodel.setProcessOutput(processOutput);
+ DataMovementProtocol dataMovementProtocol = OrchestratorUtils.getPreferredDataMovementProtocol(orchestratorContext, processModel, gatewayId);
URI source = null;
try {
- DataMovementProtocol dataMovementProtocol = OrchestratorUtils.getPreferredDataMovementProtocol(orchestratorContext, processModel, gatewayId);
- source = new URI(dataMovementProtocol.name(),
- computeResourcePreference.getLoginUserName(),
- computeResource.getHostName(),
- OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, gatewayId),
- remoteOutputDir + processOutput.getValue(), null, null);
+ if (processOutput != null) {
+ submodel.setType(DataStageType.OUPUT);
+ submodel.setProcessOutput(processOutput);
+ source = new URI(dataMovementProtocol.name(),
+ computeResourcePreference.getLoginUserName(),
+ computeResource.getHostName(),
+ OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, gatewayId),
+ remoteOutputDir + processOutput.getValue(), null, null);
+ } else {
+ // archive
+ submodel.setType(DataStageType.ARCHIVE_OUTPUT);
+ source = new URI(dataMovementProtocol.name(),
+ computeResourcePreference.getLoginUserName(),
+ computeResource.getHostName(),
+ OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, gatewayId),
+ remoteOutputDir, null, null);
+ }
} catch (URISyntaxException e) {
throw new TaskException("Error while constructing source file URI");
}
@@ -572,22 +600,4 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
}
- private TaskModel getArchiveOutTask(ProcessModel processModel, OutputDataObjectType processOutput, String gatewayId) throws RegistryException, TException {
-
- // create new task model for this task
- TaskModel taskModel = new TaskModel();
- taskModel.setParentProcessId(processModel.getProcessId());
- taskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
- taskModel.setLastUpdateTime(taskModel.getCreationTime());
- TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
- taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- taskModel.setTaskStatus(taskStatus);
- taskModel.setTaskType(TaskTypes.DATA_STAGING);
- DataStagingTaskModel submodel = new DataStagingTaskModel();
- submodel.setType(DataStageType.ARCHIVE_OUTPUT);
- taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
- return taskModel;
- }
-
-
}