You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2016/03/11 07:57:24 UTC

[08/16] airavata git commit: Completed ARchiveTask

Completed ARchiveTask


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4e80197d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4e80197d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4e80197d

Branch: refs/heads/master
Commit: 4e80197d71a92da3451c42fe8e166786315f5456
Parents: 75fb3b3
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Mon Mar 7 16:24:33 2016 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Mon Mar 7 16:24:33 2016 -0500

----------------------------------------------------------------------
 .../org/apache/airavata/gfac/impl/Factory.java  |   5 +
 .../airavata/gfac/impl/GFacEngineImpl.java      |  19 ++-
 .../airavata/gfac/impl/task/ArchiveTask.java    | 161 ++++++++++++++++++-
 .../cpi/impl/SimpleOrchestratorImpl.java        |  62 ++++---
 4 files changed, 216 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/4e80197d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index 0b70b0b..7b01ffe 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -53,6 +53,7 @@ import org.apache.airavata.gfac.core.task.Task;
 import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher;
 import org.apache.airavata.gfac.core.watcher.RedeliveryRequestWatcher;
 import org.apache.airavata.gfac.impl.job.*;
+import org.apache.airavata.gfac.impl.task.ArchiveTask;
 import org.apache.airavata.gfac.impl.watcher.CancelRequestWatcherImpl;
 import org.apache.airavata.gfac.impl.watcher.RedeliveryRequestWatcherImpl;
 import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
@@ -449,6 +450,10 @@ public abstract class Factory {
 
 	}
 
+	public static Task getArchiveTask() {
+		return new ArchiveTask();
+	}
+
 	private static class DefaultUserInfo implements UserInfo {
 
 		private String userName;

http://git-wip-us.apache.org/repos/asf/airavata/blob/4e80197d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 5e7774f..0622e58 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -296,10 +296,15 @@ public class GFacEngineImpl implements GFacEngine {
                                 processContext.setProcessStatus(status);
                                 GFacUtils.saveAndPublishProcessStatus(processContext);
                                 taskContext.setProcessOutput(subTaskModel.getProcessOutput());
-                                outputDataStaging(taskContext, processContext.isRecovery());
+                                outputDataStaging(taskContext, processContext.isRecovery(), false);
                                 break;
                             case ARCHIVE_OUTPUT:
-                                // TODO - implement output archive logic
+                                status = new ProcessStatus(ProcessState.OUTPUT_DATA_STAGING);
+                                status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                                processContext.setProcessStatus(status);
+                                GFacUtils.saveAndPublishProcessStatus(processContext);
+                                outputDataStaging(taskContext, processContext.isRecovery(), true);
+                                break;
 
                         }
                         // checkpoint
@@ -668,14 +673,19 @@ public class GFacEngineImpl implements GFacEngine {
      * @return <code>true</code> if process execution interrupted , <code>false</code> otherwise.
      * @throws GFacException
      */
-    private boolean outputDataStaging(TaskContext taskContext, boolean recovery) throws GFacException {
+    private boolean outputDataStaging(TaskContext taskContext, boolean recovery, boolean isArchive) throws GFacException {
         TaskStatus taskStatus = new TaskStatus(TaskState.EXECUTING);
         taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
         taskContext.setTaskStatus(taskStatus);
         GFacUtils.saveAndPublishTaskStatus(taskContext);
 
         ProcessContext processContext = taskContext.getParentProcessContext();
-        Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
+        Task dMoveTask = null;
+        if (isArchive) {
+            dMoveTask = Factory.getArchiveTask();
+        } else {
+            dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
+        }
         taskStatus = executeTask(taskContext, dMoveTask, recovery);
         taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
         taskContext.setTaskStatus(taskStatus);
@@ -699,6 +709,7 @@ public class GFacEngineImpl implements GFacEngine {
         return false;
     }
 
+
     @Override
     public void cancelProcess(ProcessContext processContext) throws GFacException {
         if (processContext != null) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/4e80197d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
index d51c389..f6988f2 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
@@ -20,16 +20,50 @@
  */
 package org.apache.airavata.gfac.impl.task;
 
+import com.jcraft.jsch.ChannelExec;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.core.cluster.CommandInfo;
+import org.apache.airavata.gfac.core.cluster.CommandOutput;
+import org.apache.airavata.gfac.core.cluster.RawCommandInfo;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.cluster.ServerInfo;
+import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.Task;
 import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.gfac.impl.StandardOutReader;
+import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
+import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
+import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.status.TaskState;
 import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.DataStagingTaskModel;
 import org.apache.airavata.model.task.TaskTypes;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Map;
 
 public class ArchiveTask implements Task {
+
+    private static final Logger log = LoggerFactory.getLogger(ArchiveTask.class);
+    private static final int DEFAULT_SSH_PORT = 22;
+    private String hostName;
+    private String userName;
+    private String inputPath;
+
     @Override
     public void init(Map<String, String> propertyMap) throws TaskException {
 
@@ -38,9 +72,86 @@ public class ArchiveTask implements Task {
     @Override
     public TaskStatus execute(TaskContext taskContext) {
         // implement archive logic with jscp
-        return new TaskStatus(TaskState.COMPLETED);
+        TaskStatus status = new TaskStatus(TaskState.EXECUTING);
+        ProcessContext processContext = taskContext.getParentProcessContext();
+        RemoteCluster remoteCluster = processContext.getJobSubmissionRemoteCluster();
+        AuthenticationInfo authenticationInfo = null;
+
+        StorageResourceDescription storageResource = taskContext.getParentProcessContext().getStorageResource();
+        StoragePreference storagePreference = taskContext.getParentProcessContext().getStoragePreference();
+
+        if (storageResource != null){
+            hostName = storageResource.getHostName();
+        }
+
+        if (storagePreference != null){
+            userName = storagePreference.getLoginUserName();
+            inputPath = storagePreference.getFileSystemRootLocation();
+        }
+        DataStagingTaskModel subTaskModel = null;
+        try {
+            subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel
+                    (taskContext.getTaskModel());
+        } catch (TException e) {
+            String msg = "Error! Deserialization issue with SubTask Model";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+            return status;
+        }
+
+        try {
+            authenticationInfo = Factory.getStorageSSHKeyAuthentication(taskContext.getParentProcessContext());
+            status = new TaskStatus(TaskState.COMPLETED);
+
+            ServerInfo serverInfo = new ServerInfo(userName, hostName, DEFAULT_SSH_PORT);
+            Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
+            URI sourceURI = new URI(subTaskModel.getSource());
+            URI destinationURI = null;
+            String workingDirName = null, path = null;
+            if (sourceURI.getPath().endsWith("/")) {
+                path = sourceURI.getPath().substring(0, sourceURI.getPath().length() - 1);
+            } else {
+                path = sourceURI.getPath();
+            }
+            workingDirName = path.substring(path.lastIndexOf(File.separator) + 1, path.length());
+            // tar working dir
+            // cd /Users/syodage/Desktop/temp/.. && tar -cvf path/workingDir.tar temp
+            String archiveTar = workingDirName + ".tar";
+            CommandInfo commandInfo = new RawCommandInfo("cd " + path + "/.. && tar -cvf "
+                    + path + "/" + archiveTar + " " + workingDirName);
+
+            // move tar to storage resource
+            path += "/" + archiveTar;
+            remoteCluster.execute(commandInfo);
+            destinationURI = getDestinationURI(taskContext, archiveTar);
+
+            remoteCluster.scpThirdParty(path ,destinationURI.getPath() , sshSession, RemoteCluster.DIRECTION.FROM, true);
+
+            // untar file and delete tar
+            String destPath = destinationURI.getPath();
+            String destParent = destPath.substring(0, destPath.lastIndexOf("/"));
+            commandInfo = new RawCommandInfo("cd " + destParent + " && tar -xvf " + archiveTar + " && rm " + archiveTar);
+            executeCommand(sshSession, commandInfo, new StandardOutReader());
+        } catch (GFacException | AiravataException | URISyntaxException | SSHApiException e) {
+            String msg = "Error! Archive task failed";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+        }
+        return status;
     }
 
+
     @Override
     public TaskStatus recover(TaskContext taskContext) {
         return new TaskStatus(TaskState.COMPLETED);
@@ -50,4 +161,52 @@ public class ArchiveTask implements Task {
     public TaskTypes getType() {
         return TaskTypes.DATA_STAGING;
     }
+
+    public URI getDestinationURI(TaskContext taskContext, String fileName) throws URISyntaxException {
+        String experimentDataDir = taskContext.getParentProcessContext().getProcessModel().getExperimentDataDir();
+        String filePath;
+        if(experimentDataDir != null && !experimentDataDir.isEmpty()) {
+            if(experimentDataDir.startsWith(File.separator)){
+                experimentDataDir = experimentDataDir.substring(1);
+            }
+            if(!experimentDataDir.endsWith(File.separator)){
+                experimentDataDir += File.separator;
+            }
+            filePath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator) + experimentDataDir +
+                    taskContext.getParentProcessContext().getProcessId() + File.separator + fileName;
+        } else {
+            filePath =(inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator) +
+                    taskContext.getParentProcessContext().getProcessId() + File.separator + fileName;
+        }
+        return new URI("SCP", hostName, filePath, null);
+
+    }
+
+
+    private void executeCommand(Session session,CommandInfo commandInfo, CommandOutput commandOutput) throws SSHApiException {
+        String command = commandInfo.getCommand();
+        ChannelExec channelExec = null;
+        try {
+            if (!session.isConnected()) {
+//                session = getOpenSession();
+                log.error("Error! client session is closed");
+                throw new JSchException("Error! client session is closed");
+            }
+            channelExec = ((ChannelExec) session.openChannel("exec"));
+            channelExec.setCommand(command);
+            channelExec.setInputStream(null);
+            channelExec.setErrStream(commandOutput.getStandardError());
+            log.info("Executing command {}", commandInfo.getCommand());
+            channelExec.connect();
+            commandOutput.onOutput(channelExec);
+        } catch (JSchException e) {
+            throw new SSHApiException("Unable to execute command - ", e);
+        }finally {
+            //Only disconnecting the channel, session can be reused
+            if (channelExec != null) {
+                commandOutput.exitCode(channelExec.getExitStatus());
+                channelExec.disconnect();
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/4e80197d/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index 20471df..ffcb89a 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -413,9 +413,27 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
                 }
             }
         }
+
+        if (processModel.isArchive()) {
+            createArchiveDataStatgingTask(processModel, gatewayId, dataStagingTaskIds);
+        }
         return dataStagingTaskIds;
     }
 
+    private void createArchiveDataStatgingTask(ProcessModel processModel, String gatewayId, List<String> dataStagingTaskIds) throws RegistryException {
+        TaskModel archiveTask = null;
+        try {
+            archiveTask = getOutputDataStagingTask(processModel, null, gatewayId);
+        } catch (TException e) {
+            throw new RegistryException("Error! DataStaging sub task serialization failed");
+        }
+        String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, archiveTask,
+                processModel.getProcessId());
+        archiveTask.setTaskId(taskId);
+        dataStagingTaskIds.add(archiveTask.getTaskId());
+
+    }
+
     private void createOutputDataSatagingTasks(ProcessModel processModel, String gatewayId, List<String> dataStagingTaskIds, OutputDataObjectType processOutput) throws RegistryException {
         try {
             TaskModel outputDataStagingTask = getOutputDataStagingTask(processModel, processOutput, gatewayId);
@@ -545,16 +563,26 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
             String remoteOutputDir = computeResourcePreference.getScratchLocation() + File.separator + processModel.getProcessId();
             remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir : remoteOutputDir + "/";
             DataStagingTaskModel submodel = new DataStagingTaskModel();
-            submodel.setType(DataStageType.OUPUT);
-            submodel.setProcessOutput(processOutput);
+            DataMovementProtocol dataMovementProtocol = OrchestratorUtils.getPreferredDataMovementProtocol(orchestratorContext, processModel, gatewayId);
             URI source = null;
             try {
-                DataMovementProtocol dataMovementProtocol = OrchestratorUtils.getPreferredDataMovementProtocol(orchestratorContext, processModel, gatewayId);
-                source = new URI(dataMovementProtocol.name(),
-                        computeResourcePreference.getLoginUserName(),
-                        computeResource.getHostName(),
-                        OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, gatewayId),
-                        remoteOutputDir + processOutput.getValue(), null, null);
+                if (processOutput != null) {
+                    submodel.setType(DataStageType.OUPUT);
+                    submodel.setProcessOutput(processOutput);
+                    source = new URI(dataMovementProtocol.name(),
+                            computeResourcePreference.getLoginUserName(),
+                            computeResource.getHostName(),
+                            OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, gatewayId),
+                            remoteOutputDir + processOutput.getValue(), null, null);
+                } else {
+                    // archive
+                    submodel.setType(DataStageType.ARCHIVE_OUTPUT);
+                    source = new URI(dataMovementProtocol.name(),
+                            computeResourcePreference.getLoginUserName(),
+                            computeResource.getHostName(),
+                            OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, gatewayId),
+                            remoteOutputDir, null, null);
+                }
             } catch (URISyntaxException e) {
                 throw new TaskException("Error while constructing source file URI");
             }
@@ -572,22 +600,4 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
     }
 
 
-    private TaskModel getArchiveOutTask(ProcessModel processModel, OutputDataObjectType processOutput, String gatewayId) throws RegistryException, TException {
-
-        // create new task model for this task
-        TaskModel taskModel = new TaskModel();
-        taskModel.setParentProcessId(processModel.getProcessId());
-        taskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
-        taskModel.setLastUpdateTime(taskModel.getCreationTime());
-        TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
-        taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-        taskModel.setTaskStatus(taskStatus);
-        taskModel.setTaskType(TaskTypes.DATA_STAGING);
-        DataStagingTaskModel submodel = new DataStagingTaskModel();
-        submodel.setType(DataStageType.ARCHIVE_OUTPUT);
-        taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
-        return taskModel;
-    }
-
-
 }