You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/11/03 20:47:56 UTC
[31/51] [abbrv] airavata git commit: adding output data staging task
adding output data staging task
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/05da5301
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/05da5301
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/05da5301
Branch: refs/heads/master
Commit: 05da53014edf68666ccb32599127aa94d573e818
Parents: 4d9973e
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Tue Oct 20 16:12:58 2015 -0400
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Tue Oct 20 16:12:58 2015 -0400
----------------------------------------------------------------------
.../core/utils/OrchestratorUtils.java | 68 +++++++++++++++++
.../cpi/impl/SimpleOrchestratorImpl.java | 79 +++++++++++++++++++-
2 files changed, 145 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/05da5301/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
index 9f71288..87bea94 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
@@ -27,12 +27,16 @@ import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.core.context.ProcessContext;
import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.appcatalog.computeresource.DataMovementInterface;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
import org.apache.airavata.model.appcatalog.gatewayprofile.DataStoragePreference;
import org.apache.airavata.model.process.ProcessModel;
import org.apache.airavata.orchestrator.core.OrchestratorConfiguration;
import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
+import org.apache.airavata.registry.core.app.catalog.model.*;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.AppCatalog;
import org.apache.airavata.registry.cpi.AppCatalogException;
@@ -73,6 +77,19 @@ public class OrchestratorUtils {
}
}
+ public static DataMovementProtocol getPreferredDataMovementProtocol(OrchestratorContext context, ProcessModel model, String gatewayId) throws RegistryException {
+ try {
+ GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile();
+ String resourceHostId = model.getComputeResourceId();
+ ComputeResourcePreference preference = gatewayProfile.getComputeResourcePreference(gatewayId
+ , resourceHostId);
+ return preference.getPreferredDataMovementProtocol();
+ } catch (AppCatalogException e) {
+ logger.error("Error occurred while initializing app catalog", e);
+ throw new RegistryException("Error occurred while initializing app catalog", e);
+ }
+ }
+
public static ComputeResourcePreference getComputeResourcePreference(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException {
try {
GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile();
@@ -142,6 +159,47 @@ public class OrchestratorUtils {
}
}
+ public static DataMovementInterface getPrefferredDataMovementInterface(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException {
+ try {
+ String resourceHostId = processModel.getComputeResourceId();
+ ComputeResourcePreference resourcePreference = getComputeResourcePreference(context, processModel, gatewayId);
+ DataMovementProtocol preferredDataMovementProtocol = resourcePreference.getPreferredDataMovementProtocol();
+ ComputeResourceDescription resourceDescription = context.getRegistry().getAppCatalog().getComputeResource().getComputeResource(resourceHostId);
+ List<DataMovementInterface> dataMovementInterfaces = resourceDescription.getDataMovementInterfaces();
+ if (dataMovementInterfaces != null && !dataMovementInterfaces.isEmpty()) {
+ for (DataMovementInterface dataMovementInterface : dataMovementInterfaces){
+ if (preferredDataMovementProtocol != null){
+ if (preferredDataMovementProtocol.toString().equals(dataMovementInterface.getDataMovementProtocol().toString())){
+ return dataMovementInterface;
+ }
+ }
+ }
+ } else {
+ throw new RegistryException("Compute resource should have at least one data movement interface defined...");
+ }
+ } catch (AppCatalogException e) {
+ throw new RegistryException("Error occurred while retrieving data from app catalog", e);
+ }
+ return null;
+ }
+
+ public static int getDataMovementPort(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException{
+ try {
+ DataMovementProtocol protocol = getPreferredDataMovementProtocol(context, processModel, gatewayId);
+ DataMovementInterface dataMovementInterface = getPrefferredDataMovementInterface(context, processModel, gatewayId);
+ if (protocol == DataMovementProtocol.SCP ) {
+ SCPDataMovement scpDataMovement = getSCPDataMovement(context, dataMovementInterface.getDataMovementInterfaceId());
+ if (scpDataMovement != null) {
+ return scpDataMovement.getSshPort();
+ }
+ }
+ } catch (RegistryException e) {
+ logger.error("Error occurred while retrieving security protocol", e);
+ }
+ return 0;
+ }
+
+
public static SecurityProtocol getSecurityProtocol(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException{
try {
JobSubmissionProtocol submissionProtocol = getPreferredJobSubmissionProtocol(context, processModel, gatewayId);
@@ -201,4 +259,14 @@ public class OrchestratorUtils {
}
}
+ public static SCPDataMovement getSCPDataMovement(OrchestratorContext context, String dataMoveId) throws RegistryException {
+ try {
+ AppCatalog appCatalog = context.getRegistry().getAppCatalog();
+ return appCatalog.getComputeResource().getSCPDataMovement(dataMoveId);
+ } catch (Exception e) {
+ String errorMsg = "Error while retrieving SCP Data movement with submission id : " + dataMoveId;
+ logger.error(errorMsg, e);
+ throw new RegistryException(errorMsg, e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/05da5301/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index ae08cf5..c76f9c6 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -22,10 +22,13 @@ package org.apache.airavata.orchestrator.cpi.impl;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.gfac.core.task.TaskException;
import org.apache.airavata.model.appcatalog.computeresource.*;
import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.appcatalog.gatewayprofile.DataStoragePreference;
import org.apache.airavata.model.application.io.DataType;
import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.commons.ErrorModel;
import org.apache.airavata.model.error.LaunchValidationException;
import org.apache.airavata.model.error.ValidationResults;
@@ -49,6 +52,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.*;
import java.util.concurrent.ExecutorService;
@@ -279,7 +284,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
ComputeResourceDescription computeResource = appCatalog.getComputeResource().getComputeResource(resourceHostId);
List<String> taskIdList = createAndSaveEnvSetupTask(gatewayId, processModel, experimentCatalog);
- taskIdList.addAll(createAndSaveDataStagingTasks(processModel));
+ taskIdList.addAll(createAndSaveDataStagingTasks(processModel, gatewayId));
if (autoSchedule) {
List<BatchQueue> definedBatchQueues = computeResource.getBatchQueues();
@@ -347,9 +352,11 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
return envTaskIds;
}
- public List<String> createAndSaveDataStagingTasks (ProcessModel processModel) throws RegistryException {
+ public List<String> createAndSaveDataStagingTasks (ProcessModel processModel, String gatewayId) throws RegistryException {
List<String> dataStagingTaskIds = new ArrayList<>();
List<InputDataObjectType> processInputs = processModel.getProcessInputs();
+ List<OutputDataObjectType> processOutputs = processModel.getProcessOutputs();
+
sortByInputOrder(processInputs);
if (processInputs != null) {
for (InputDataObjectType processInput : processInputs) {
@@ -376,6 +383,33 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
}
}
}
+
+ if (processOutputs != null) {
+ for (OutputDataObjectType processOutput : processOutputs) {
+ DataType type = processOutput.getType();
+ switch (type) {
+ case STDERR:
+ break;
+ case STDOUT:
+ break;
+ case URI:
+ try {
+ TaskModel outputDataStagingTask = getOutputDataStagingTask(processModel, processOutput, gatewayId);
+ String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, outputDataStagingTask,
+ processModel.getProcessId());
+ outputDataStagingTask.setTaskId(taskId);
+ dataStagingTaskIds.add(outputDataStagingTask.getTaskId());
+ } catch (TException e) {
+ throw new RegistryException("Error while serializing data staging sub task model");
+ }
+ break;
+ default:
+ // nothing to do
+ break;
+ }
+ }
+ }
+
return dataStagingTaskIds;
}
@@ -451,5 +485,46 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
return taskModel;
}
+ private TaskModel getOutputDataStagingTask(ProcessModel processModel, OutputDataObjectType processOutput, String gatewayId) throws RegistryException, TException {
+ try {
+
+ // create new task model for this task
+ TaskModel taskModel = new TaskModel();
+ taskModel.setParentProcessId(processModel.getProcessId());
+ taskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+ taskModel.setLastUpdateTime(taskModel.getCreationTime());
+ TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ taskModel.setTaskStatus(taskStatus);
+ taskModel.setTaskType(TaskTypes.DATA_STAGING);
+ // create data staging sub task model
+ DataStoragePreference dataStoragePreference = OrchestratorUtils.getDateStoragePreference(orchestratorContext, processModel, gatewayId);
+ ComputeResourcePreference computeResourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId);
+ ComputeResourceDescription computeResource = orchestratorContext.getRegistry().getAppCatalog().getComputeResource().getComputeResource(processModel.getComputeResourceId());
+ String remoteOutputDir = dataStoragePreference.getFileSystemRootLocation();
+ remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir : remoteOutputDir + "/";
+ DataStagingTaskModel submodel = new DataStagingTaskModel();
+ submodel.setType(DataStageType.OUPUT);
+ URI source = null;
+ try {
+ DataMovementProtocol dataMovementProtocol = OrchestratorUtils.getPreferredDataMovementProtocol(orchestratorContext, processModel, gatewayId);
+ source = new URI(dataMovementProtocol.name(), computeResource.getHostName(),
+ computeResourcePreference.getLoginUserName(), OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, gatewayId), remoteOutputDir + processOutput.getValue(), null, null);
+ } catch (URISyntaxException e) {
+ throw new TaskException("Error while constructing source file URI");
+ }
+ // We don't know destination location at this time, data staging task will set this.
+ // because destination is required field we set dummy destination
+ submodel.setSource(source.toString());
+ // We don't know destination location at this time, data staging task will set this.
+ // because destination is required field we set dummy destination
+ submodel.setDestination("dummy://temp/file/location");
+ taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
+ return taskModel;
+ } catch (AppCatalogException | TaskException e) {
+ throw new RegistryException("Error occurred while retrieving data movement from app catalog", e);
+ }
+ }
+
}