You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/11/03 20:48:05 UTC

[40/51] [abbrv] airavata git commit: fixing input staging and output staging task creation

fixing input staging and output staging task creation


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/138c9949
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/138c9949
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/138c9949

Branch: refs/heads/master
Commit: 138c9949ca4fd5ed45045750707b54c7ba9bb14a
Parents: 53e3d4c
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Thu Oct 22 11:57:12 2015 -0400
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Thu Oct 22 11:57:12 2015 -0400

----------------------------------------------------------------------
 .../core/utils/OrchestratorUtils.java           | 17 ++++--
 .../cpi/impl/SimpleOrchestratorImpl.java        | 56 +++++++++++++++-----
 2 files changed, 57 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/138c9949/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
index 87bea94..01dd216 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
@@ -26,6 +26,7 @@ import java.util.*;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
 import org.apache.airavata.model.appcatalog.computeresource.*;
 import org.apache.airavata.model.appcatalog.computeresource.DataMovementInterface;
 import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
@@ -38,10 +39,8 @@ import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
 import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
 import org.apache.airavata.registry.core.app.catalog.model.*;
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.registry.cpi.AppCatalog;
-import org.apache.airavata.registry.cpi.AppCatalogException;
-import org.apache.airavata.registry.cpi.GwyResourceProfile;
-import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.*;
+import org.apache.airavata.registry.cpi.ApplicationInterface;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,6 +76,16 @@ public class OrchestratorUtils {
         }
     }
 
+    public static String getApplicationInterfaceName(OrchestratorContext context, ProcessModel model) throws RegistryException {
+        try {
+            ApplicationInterface applicationInterface = context.getRegistry().getAppCatalog().getApplicationInterface();
+            ApplicationInterfaceDescription appInterface = applicationInterface.getApplicationInterface(model.getApplicationInterfaceId());
+            return appInterface.getApplicationName();
+        } catch (AppCatalogException e) {
+            throw new RegistryException("Error while retrieving application interface", e);
+        }
+    }
+
     public static DataMovementProtocol getPreferredDataMovementProtocol(OrchestratorContext context, ProcessModel model, String gatewayId) throws RegistryException {
         try {
             GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile();

http://git-wip-us.apache.org/repos/asf/airavata/blob/138c9949/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index 03682e8..be65a70 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -286,7 +286,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
             ComputeResourceDescription computeResource = appCatalog.getComputeResource().getComputeResource(resourceHostId);
 
             List<String> taskIdList = createAndSaveEnvSetupTask(gatewayId, processModel, experimentCatalog);
-            taskIdList.addAll(createAndSaveInputDataStagingTasks(processModel));
+            taskIdList.addAll(createAndSaveInputDataStagingTasks(processModel, gatewayId));
 
             if (autoSchedule) {
                 List<BatchQueue> definedBatchQueues = computeResource.getBatchQueues();
@@ -354,7 +354,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
         return envTaskIds;
     }
 
-    public List<String> createAndSaveInputDataStagingTasks(ProcessModel processModel) throws RegistryException {
+    public List<String> createAndSaveInputDataStagingTasks(ProcessModel processModel, String gatewayId) throws RegistryException {
         List<String> dataStagingTaskIds = new ArrayList<>();
         List<InputDataObjectType> processInputs = processModel.getProcessInputs();
 
@@ -369,12 +369,12 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
                         break;
                     case URI:
                         try {
-                            TaskModel inputDataStagingTask = getInputDataStagingTask(processModel, processInput);
+                            TaskModel inputDataStagingTask = getInputDataStagingTask(processModel, processInput, gatewayId);
                             String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, inputDataStagingTask,
                                     processModel.getProcessId());
                             inputDataStagingTask.setTaskId(taskId);
                             dataStagingTaskIds.add(inputDataStagingTask.getTaskId());
-                        } catch (TException e) {
+                        } catch (TException | AppCatalogException | TaskException e) {
                             throw new RegistryException("Error while serializing data staging sub task model");
                         }
                         break;
@@ -390,12 +390,36 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
     public List<String> createAndSaveOutputDataStagingTasks(ProcessModel processModel, String gatewayId) throws RegistryException {
         List<String> dataStagingTaskIds = new ArrayList<>();
         List<OutputDataObjectType> processOutputs = processModel.getProcessOutputs();
-
+        String appName = OrchestratorUtils.getApplicationInterfaceName(orchestratorContext, processModel);
         if (processOutputs != null) {
             for (OutputDataObjectType processOutput : processOutputs) {
                 DataType type = processOutput.getType();
                 switch (type) {
-                    case URI: case STDOUT : case STDERR:
+                    case STDOUT :
+                        processOutput.setValue(appName + ".stdout");
+                        try {
+                            TaskModel outputDataStagingTask = getOutputDataStagingTask(processModel, processOutput, gatewayId);
+                            String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, outputDataStagingTask,
+                                    processModel.getProcessId());
+                            outputDataStagingTask.setTaskId(taskId);
+                            dataStagingTaskIds.add(outputDataStagingTask.getTaskId());
+                        } catch (TException e) {
+                            throw new RegistryException("Error while serializing data staging sub task model", e);
+                        }
+                        break;
+                    case STDERR:
+                        processOutput.setValue(appName + ".stderr");
+                        try {
+                            TaskModel outputDataStagingTask = getOutputDataStagingTask(processModel, processOutput, gatewayId);
+                            String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, outputDataStagingTask,
+                                    processModel.getProcessId());
+                            outputDataStagingTask.setTaskId(taskId);
+                            dataStagingTaskIds.add(outputDataStagingTask.getTaskId());
+                        } catch (TException e) {
+                            throw new RegistryException("Error while serializing data staging sub task model", e);
+                        }
+                        break;
+                    case URI:
                         try {
                             TaskModel outputDataStagingTask = getOutputDataStagingTask(processModel, processOutput, gatewayId);
                             String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, outputDataStagingTask,
@@ -466,7 +490,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
         });
     }
 
-    private TaskModel getInputDataStagingTask(ProcessModel processModel, InputDataObjectType processInput) throws RegistryException, TException {
+    private TaskModel getInputDataStagingTask(ProcessModel processModel, InputDataObjectType processInput, String gatewayId) throws RegistryException, TException, AppCatalogException, TaskException {
         // create new task model for this task
         TaskModel taskModel = new TaskModel();
         taskModel.setParentProcessId(processModel.getProcessId());
@@ -478,12 +502,22 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
         taskModel.setTaskType(TaskTypes.DATA_STAGING);
         // create data staging sub task model
         DataStagingTaskModel submodel = new DataStagingTaskModel();
+        ComputeResourcePreference computeResourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId);
+        ComputeResourceDescription computeResource = orchestratorContext.getRegistry().getAppCatalog().getComputeResource().getComputeResource(processModel.getComputeResourceId());
+        String remoteOutputDir = computeResourcePreference.getScratchLocation() + File.separator + processModel.getProcessId();
+        remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir : remoteOutputDir + "/";
+        URI destination = null;
+        try {
+            DataMovementProtocol dataMovementProtocol = OrchestratorUtils.getPreferredDataMovementProtocol(orchestratorContext, processModel, gatewayId);
+            destination = new URI(dataMovementProtocol.name(), computeResource.getHostName(),
+                    computeResourcePreference.getLoginUserName(), OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, gatewayId), remoteOutputDir , null, null);
+        } catch (URISyntaxException e) {
+            throw new TaskException("Error while constructing destination file URI");
+        }
         submodel.setType(DataStageType.INPUT);
         submodel.setSource(processInput.getValue());
         submodel.setProcessInput(processInput);
-        // We don't know destination location at this time, data staging task will set this.
-        // because destination is required field we set dummy destination
-        submodel.setDestination("dummy://temp/file/location");
+        submodel.setDestination(destination.toString());
         taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
         return taskModel;
     }
@@ -500,8 +534,6 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
             taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
             taskModel.setTaskStatus(taskStatus);
             taskModel.setTaskType(TaskTypes.DATA_STAGING);
-            // We assume output location is set at data storage preference
-            DataStoragePreference dataStoragePreference = OrchestratorUtils.getDateStoragePreference(orchestratorContext, processModel, gatewayId);
             ComputeResourcePreference computeResourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId);
             ComputeResourceDescription computeResource = orchestratorContext.getRegistry().getAppCatalog().getComputeResource().getComputeResource(processModel.getComputeResourceId());
             String remoteOutputDir = computeResourcePreference.getScratchLocation() + File.separator + processModel.getProcessId();