You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/11/03 20:48:05 UTC
[40/51] [abbrv] airavata git commit: fixing input staging and output
staging task creation
fixing input staging and output staging task creation
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/138c9949
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/138c9949
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/138c9949
Branch: refs/heads/master
Commit: 138c9949ca4fd5ed45045750707b54c7ba9bb14a
Parents: 53e3d4c
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Thu Oct 22 11:57:12 2015 -0400
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Thu Oct 22 11:57:12 2015 -0400
----------------------------------------------------------------------
.../core/utils/OrchestratorUtils.java | 17 ++++--
.../cpi/impl/SimpleOrchestratorImpl.java | 56 +++++++++++++++-----
2 files changed, 57 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/138c9949/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
index 87bea94..01dd216 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
@@ -26,6 +26,7 @@ import java.util.*;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
import org.apache.airavata.model.appcatalog.computeresource.*;
import org.apache.airavata.model.appcatalog.computeresource.DataMovementInterface;
import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
@@ -38,10 +39,8 @@ import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
import org.apache.airavata.registry.core.app.catalog.model.*;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.registry.cpi.AppCatalog;
-import org.apache.airavata.registry.cpi.AppCatalogException;
-import org.apache.airavata.registry.cpi.GwyResourceProfile;
-import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.*;
+import org.apache.airavata.registry.cpi.ApplicationInterface;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,6 +76,16 @@ public class OrchestratorUtils {
}
}
+ public static String getApplicationInterfaceName(OrchestratorContext context, ProcessModel model) throws RegistryException {
+ try {
+ ApplicationInterface applicationInterface = context.getRegistry().getAppCatalog().getApplicationInterface();
+ ApplicationInterfaceDescription appInterface = applicationInterface.getApplicationInterface(model.getApplicationInterfaceId());
+ return appInterface.getApplicationName();
+ } catch (AppCatalogException e) {
+ throw new RegistryException("Error while retrieving application interface", e);
+ }
+ }
+
public static DataMovementProtocol getPreferredDataMovementProtocol(OrchestratorContext context, ProcessModel model, String gatewayId) throws RegistryException {
try {
GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile();
http://git-wip-us.apache.org/repos/asf/airavata/blob/138c9949/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index 03682e8..be65a70 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -286,7 +286,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
ComputeResourceDescription computeResource = appCatalog.getComputeResource().getComputeResource(resourceHostId);
List<String> taskIdList = createAndSaveEnvSetupTask(gatewayId, processModel, experimentCatalog);
- taskIdList.addAll(createAndSaveInputDataStagingTasks(processModel));
+ taskIdList.addAll(createAndSaveInputDataStagingTasks(processModel, gatewayId));
if (autoSchedule) {
List<BatchQueue> definedBatchQueues = computeResource.getBatchQueues();
@@ -354,7 +354,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
return envTaskIds;
}
- public List<String> createAndSaveInputDataStagingTasks(ProcessModel processModel) throws RegistryException {
+ public List<String> createAndSaveInputDataStagingTasks(ProcessModel processModel, String gatewayId) throws RegistryException {
List<String> dataStagingTaskIds = new ArrayList<>();
List<InputDataObjectType> processInputs = processModel.getProcessInputs();
@@ -369,12 +369,12 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
break;
case URI:
try {
- TaskModel inputDataStagingTask = getInputDataStagingTask(processModel, processInput);
+ TaskModel inputDataStagingTask = getInputDataStagingTask(processModel, processInput, gatewayId);
String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, inputDataStagingTask,
processModel.getProcessId());
inputDataStagingTask.setTaskId(taskId);
dataStagingTaskIds.add(inputDataStagingTask.getTaskId());
- } catch (TException e) {
+ } catch (TException | AppCatalogException | TaskException e) {
throw new RegistryException("Error while serializing data staging sub task model");
}
break;
@@ -390,12 +390,36 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
public List<String> createAndSaveOutputDataStagingTasks(ProcessModel processModel, String gatewayId) throws RegistryException {
List<String> dataStagingTaskIds = new ArrayList<>();
List<OutputDataObjectType> processOutputs = processModel.getProcessOutputs();
-
+ String appName = OrchestratorUtils.getApplicationInterfaceName(orchestratorContext, processModel);
if (processOutputs != null) {
for (OutputDataObjectType processOutput : processOutputs) {
DataType type = processOutput.getType();
switch (type) {
- case URI: case STDOUT : case STDERR:
+ case STDOUT :
+ processOutput.setValue(appName + ".stdout");
+ try {
+ TaskModel outputDataStagingTask = getOutputDataStagingTask(processModel, processOutput, gatewayId);
+ String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, outputDataStagingTask,
+ processModel.getProcessId());
+ outputDataStagingTask.setTaskId(taskId);
+ dataStagingTaskIds.add(outputDataStagingTask.getTaskId());
+ } catch (TException e) {
+ throw new RegistryException("Error while serializing data staging sub task model", e);
+ }
+ break;
+ case STDERR:
+ processOutput.setValue(appName + ".stderr");
+ try {
+ TaskModel outputDataStagingTask = getOutputDataStagingTask(processModel, processOutput, gatewayId);
+ String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, outputDataStagingTask,
+ processModel.getProcessId());
+ outputDataStagingTask.setTaskId(taskId);
+ dataStagingTaskIds.add(outputDataStagingTask.getTaskId());
+ } catch (TException e) {
+ throw new RegistryException("Error while serializing data staging sub task model", e);
+ }
+ break;
+ case URI:
try {
TaskModel outputDataStagingTask = getOutputDataStagingTask(processModel, processOutput, gatewayId);
String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, outputDataStagingTask,
@@ -466,7 +490,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
});
}
- private TaskModel getInputDataStagingTask(ProcessModel processModel, InputDataObjectType processInput) throws RegistryException, TException {
+ private TaskModel getInputDataStagingTask(ProcessModel processModel, InputDataObjectType processInput, String gatewayId) throws RegistryException, TException, AppCatalogException, TaskException {
// create new task model for this task
TaskModel taskModel = new TaskModel();
taskModel.setParentProcessId(processModel.getProcessId());
@@ -478,12 +502,22 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
taskModel.setTaskType(TaskTypes.DATA_STAGING);
// create data staging sub task model
DataStagingTaskModel submodel = new DataStagingTaskModel();
+ ComputeResourcePreference computeResourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId);
+ ComputeResourceDescription computeResource = orchestratorContext.getRegistry().getAppCatalog().getComputeResource().getComputeResource(processModel.getComputeResourceId());
+ String remoteOutputDir = computeResourcePreference.getScratchLocation() + File.separator + processModel.getProcessId();
+ remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir : remoteOutputDir + "/";
+ URI destination = null;
+ try {
+ DataMovementProtocol dataMovementProtocol = OrchestratorUtils.getPreferredDataMovementProtocol(orchestratorContext, processModel, gatewayId);
+ destination = new URI(dataMovementProtocol.name(), computeResource.getHostName(),
+ computeResourcePreference.getLoginUserName(), OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, gatewayId), remoteOutputDir , null, null);
+ } catch (URISyntaxException e) {
+ throw new TaskException("Error while constructing destination file URI");
+ }
submodel.setType(DataStageType.INPUT);
submodel.setSource(processInput.getValue());
submodel.setProcessInput(processInput);
- // We don't know destination location at this time, data staging task will set this.
- // because destination is required field we set dummy destination
- submodel.setDestination("dummy://temp/file/location");
+ submodel.setDestination(destination.toString());
taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
return taskModel;
}
@@ -500,8 +534,6 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
taskModel.setTaskStatus(taskStatus);
taskModel.setTaskType(TaskTypes.DATA_STAGING);
- // We assume output location is set at data storage preference
- DataStoragePreference dataStoragePreference = OrchestratorUtils.getDateStoragePreference(orchestratorContext, processModel, gatewayId);
ComputeResourcePreference computeResourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId);
ComputeResourceDescription computeResource = orchestratorContext.getRegistry().getAppCatalog().getComputeResource().getComputeResource(processModel.getComputeResourceId());
String remoteOutputDir = computeResourcePreference.getScratchLocation() + File.separator + processModel.getProcessId();