You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/11/03 20:47:56 UTC

[31/51] [abbrv] airavata git commit: adding output data staging task

adding output data staging task


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/05da5301
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/05da5301
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/05da5301

Branch: refs/heads/master
Commit: 05da53014edf68666ccb32599127aa94d573e818
Parents: 4d9973e
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Tue Oct 20 16:12:58 2015 -0400
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Tue Oct 20 16:12:58 2015 -0400

----------------------------------------------------------------------
 .../core/utils/OrchestratorUtils.java           | 68 +++++++++++++++++
 .../cpi/impl/SimpleOrchestratorImpl.java        | 79 +++++++++++++++++++-
 2 files changed, 145 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/05da5301/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
index 9f71288..87bea94 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
@@ -27,12 +27,16 @@ import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.appcatalog.computeresource.DataMovementInterface;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
 import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
 import org.apache.airavata.model.appcatalog.gatewayprofile.DataStoragePreference;
 import org.apache.airavata.model.process.ProcessModel;
 import org.apache.airavata.orchestrator.core.OrchestratorConfiguration;
 import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
 import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
+import org.apache.airavata.registry.core.app.catalog.model.*;
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.AppCatalog;
 import org.apache.airavata.registry.cpi.AppCatalogException;
@@ -73,6 +77,19 @@ public class OrchestratorUtils {
         }
     }
 
+    public static DataMovementProtocol getPreferredDataMovementProtocol(OrchestratorContext context, ProcessModel model, String gatewayId) throws RegistryException {
+        try {
+            GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile();
+            String resourceHostId = model.getComputeResourceId();
+            ComputeResourcePreference preference = gatewayProfile.getComputeResourcePreference(gatewayId
+                    , resourceHostId);
+            return preference.getPreferredDataMovementProtocol();
+        } catch (AppCatalogException e) {
+            logger.error("Error occurred while initializing app catalog", e);
+            throw new RegistryException("Error occurred while initializing app catalog", e);
+        }
+    }
+
     public static ComputeResourcePreference getComputeResourcePreference(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException {
         try {
             GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile();
@@ -142,6 +159,47 @@ public class OrchestratorUtils {
         }
     }
 
+    public static DataMovementInterface getPrefferredDataMovementInterface(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException {
+        try {
+            String resourceHostId = processModel.getComputeResourceId();
+            ComputeResourcePreference resourcePreference = getComputeResourcePreference(context, processModel, gatewayId);
+            DataMovementProtocol preferredDataMovementProtocol = resourcePreference.getPreferredDataMovementProtocol();
+            ComputeResourceDescription resourceDescription = context.getRegistry().getAppCatalog().getComputeResource().getComputeResource(resourceHostId);
+            List<DataMovementInterface> dataMovementInterfaces = resourceDescription.getDataMovementInterfaces();
+            if (dataMovementInterfaces != null && !dataMovementInterfaces.isEmpty()) {
+                for (DataMovementInterface dataMovementInterface : dataMovementInterfaces){
+                    if (preferredDataMovementProtocol != null){
+                        if (preferredDataMovementProtocol.toString().equals(dataMovementInterface.getDataMovementProtocol().toString())){
+                            return dataMovementInterface;
+                        }
+                    }
+                }
+            } else {
+                throw new RegistryException("Compute resource should have at least one data movement interface defined...");
+            }
+        } catch (AppCatalogException e) {
+            throw new RegistryException("Error occurred while retrieving data from app catalog", e);
+        }
+        return null;
+    }
+
+    public static int getDataMovementPort(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException{
+        try {
+            DataMovementProtocol protocol = getPreferredDataMovementProtocol(context, processModel, gatewayId);
+            DataMovementInterface dataMovementInterface = getPrefferredDataMovementInterface(context, processModel, gatewayId);
+            if (protocol == DataMovementProtocol.SCP ) {
+                SCPDataMovement scpDataMovement = getSCPDataMovement(context, dataMovementInterface.getDataMovementInterfaceId());
+                if (scpDataMovement != null) {
+                    return scpDataMovement.getSshPort();
+                }
+            }
+        } catch (RegistryException e) {
+            logger.error("Error occurred while retrieving security protocol", e);
+        }
+        return 0;
+    }
+
+
     public static SecurityProtocol getSecurityProtocol(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException{
         try {
             JobSubmissionProtocol submissionProtocol = getPreferredJobSubmissionProtocol(context, processModel, gatewayId);
@@ -201,4 +259,14 @@ public class OrchestratorUtils {
         }
     }
 
+    public static SCPDataMovement getSCPDataMovement(OrchestratorContext context, String dataMoveId) throws RegistryException {
+        try {
+            AppCatalog appCatalog = context.getRegistry().getAppCatalog();
+            return appCatalog.getComputeResource().getSCPDataMovement(dataMoveId);
+        } catch (Exception e) {
+            String errorMsg = "Error while retrieving SCP Data movement with submission id : " + dataMoveId;
+            logger.error(errorMsg, e);
+            throw new RegistryException(errorMsg, e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/05da5301/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index ae08cf5..c76f9c6 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -22,10 +22,13 @@ package org.apache.airavata.orchestrator.cpi.impl;
 
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.gfac.core.task.TaskException;
 import org.apache.airavata.model.appcatalog.computeresource.*;
 import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.appcatalog.gatewayprofile.DataStoragePreference;
 import org.apache.airavata.model.application.io.DataType;
 import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.error.LaunchValidationException;
 import org.apache.airavata.model.error.ValidationResults;
@@ -49,6 +52,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
 
@@ -279,7 +284,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
             ComputeResourceDescription computeResource = appCatalog.getComputeResource().getComputeResource(resourceHostId);
 
             List<String> taskIdList = createAndSaveEnvSetupTask(gatewayId, processModel, experimentCatalog);
-            taskIdList.addAll(createAndSaveDataStagingTasks(processModel));
+            taskIdList.addAll(createAndSaveDataStagingTasks(processModel, gatewayId));
 
             if (autoSchedule) {
                 List<BatchQueue> definedBatchQueues = computeResource.getBatchQueues();
@@ -347,9 +352,11 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
         return envTaskIds;
     }
 
-    public List<String> createAndSaveDataStagingTasks (ProcessModel processModel) throws RegistryException {
+    public List<String> createAndSaveDataStagingTasks (ProcessModel processModel, String gatewayId) throws RegistryException {
         List<String> dataStagingTaskIds = new ArrayList<>();
         List<InputDataObjectType> processInputs = processModel.getProcessInputs();
+        List<OutputDataObjectType> processOutputs = processModel.getProcessOutputs();
+
         sortByInputOrder(processInputs);
         if (processInputs != null) {
             for (InputDataObjectType processInput : processInputs) {
@@ -376,6 +383,33 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
                 }
             }
         }
+
+        if (processOutputs != null) {
+            for (OutputDataObjectType processOutput : processOutputs) {
+                DataType type = processOutput.getType();
+                switch (type) {
+                    case STDERR:
+                        break;
+                    case STDOUT:
+                        break;
+                    case URI:
+                        try {
+                            TaskModel outputDataStagingTask = getOutputDataStagingTask(processModel, processOutput, gatewayId);
+                            String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, outputDataStagingTask,
+                                    processModel.getProcessId());
+                            outputDataStagingTask.setTaskId(taskId);
+                            dataStagingTaskIds.add(outputDataStagingTask.getTaskId());
+                        } catch (TException e) {
+                            throw new RegistryException("Error while serializing data staging sub task model");
+                        }
+                        break;
+                    default:
+                        // nothing to do
+                        break;
+                }
+            }
+        }
+
         return dataStagingTaskIds;
     }
 
@@ -451,5 +485,46 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
         return taskModel;
     }
 
+    private TaskModel getOutputDataStagingTask(ProcessModel processModel, OutputDataObjectType processOutput, String gatewayId) throws RegistryException, TException {
+        try {
+
+            // create new task model for this task
+            TaskModel taskModel = new TaskModel();
+            taskModel.setParentProcessId(processModel.getProcessId());
+            taskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+            taskModel.setLastUpdateTime(taskModel.getCreationTime());
+            TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
+            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            taskModel.setTaskStatus(taskStatus);
+            taskModel.setTaskType(TaskTypes.DATA_STAGING);
+            // create data staging sub task model
+            DataStoragePreference dataStoragePreference = OrchestratorUtils.getDateStoragePreference(orchestratorContext, processModel, gatewayId);
+            ComputeResourcePreference computeResourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId);
+            ComputeResourceDescription computeResource = orchestratorContext.getRegistry().getAppCatalog().getComputeResource().getComputeResource(processModel.getComputeResourceId());
+            String remoteOutputDir = dataStoragePreference.getFileSystemRootLocation();
+            remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir : remoteOutputDir + "/";
+            DataStagingTaskModel submodel = new DataStagingTaskModel();
+            submodel.setType(DataStageType.OUPUT);
+            URI source = null;
+            try {
+                DataMovementProtocol dataMovementProtocol = OrchestratorUtils.getPreferredDataMovementProtocol(orchestratorContext, processModel, gatewayId);
+                source = new URI(dataMovementProtocol.name(), computeResource.getHostName(),
+                        computeResourcePreference.getLoginUserName(), OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, gatewayId), remoteOutputDir + processOutput.getValue(), null, null);
+            } catch (URISyntaxException e) {
+                throw new TaskException("Error while constructing source file URI");
+            }
+            // We don't know destination location at this time, data staging task will set this.
+            // because destination is required field we set dummy destination
+            submodel.setSource(source.toString());
+            // We don't know destination location at this time, data staging task will set this.
+            // because destination is required field we set dummy destination
+            submodel.setDestination("dummy://temp/file/location");
+            taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
+            return taskModel;
+        } catch (AppCatalogException | TaskException e) {
+           throw new RegistryException("Error occurred while retrieving data movement from app catalog", e);
+        }
+    }
+
 
 }