You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ma...@apache.org on 2021/12/15 21:54:15 UTC
[airavata] branch AIRAVATA-3549 updated: AIRAVATA-3549 Initial API/Orch changes for fetching intermediate outputs
This is an automated email from the ASF dual-hosted git repository.
machristie pushed a commit to branch AIRAVATA-3549
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/AIRAVATA-3549 by this push:
new 5ef1a8a AIRAVATA-3549 Initial API/Orch changes for fetching intermediate outputs
5ef1a8a is described below
commit 5ef1a8a730e0e0f5fcc78be4ade0eff400152b8c
Author: Marcus Christie <ma...@apache.org>
AuthorDate: Wed Dec 15 16:54:03 2021 -0500
AIRAVATA-3549 Initial API/Orch changes for fetching intermediate outputs
---
.../api/server/handler/AiravataServerHandler.java | 17 ++++-
.../cpi/impl/SimpleOrchestratorImpl.java | 77 ++++++++++++++++++++
.../server/OrchestratorServerHandler.java | 83 ++++++++++++++++++++++
3 files changed, 175 insertions(+), 2 deletions(-)
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
index 088c909..924de01 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
@@ -71,6 +71,7 @@ import org.apache.airavata.model.experiment.*;
import org.apache.airavata.model.group.ResourcePermissionType;
import org.apache.airavata.model.group.ResourceType;
import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.messaging.event.ExperimentIntermediateOutputsEvent;
import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
import org.apache.airavata.model.messaging.event.ExperimentSubmitEvent;
import org.apache.airavata.model.messaging.event.MessageType;
@@ -1944,8 +1945,13 @@ public class AiravataServerHandler implements Airavata.Iface {
public void fetchIntermediateOutputs(AuthzToken authzToken, String airavataExperimentId, List<String> outputNames)
throws InvalidRequestException, ExperimentNotFoundException, AiravataClientException,
AiravataSystemException, AuthorizationException, TException {
- // TODO Publish message to experiment channel with MessageType.INTERMEDIATE_OUTPUTS
- // TODO create an event of type ExperimentIntermediateOutputsEvent
+ // TODO: Verify user has access to experiment
+ String gatewayId = authzToken.getClaimsMap().get(Constants.GATEWAY_ID);
+ try {
+ submitExperimentIntermediateOutputsEvent(gatewayId, airavataExperimentId, outputNames);
+ } catch (AiravataException e) {
+ throw new RuntimeException("Failed to submit intermediate outputs event", e);
+ }
}
@SecurityCheck
@@ -6180,6 +6186,13 @@ public class AiravataServerHandler implements Airavata.Iface {
experimentPublisher.publish(messageContext);
}
+ private void submitExperimentIntermediateOutputsEvent(String gatewayId, String experimentId, List<String> outputNames) throws AiravataException {
+ ExperimentIntermediateOutputsEvent event = new ExperimentIntermediateOutputsEvent(experimentId, gatewayId, outputNames);
+ MessageContext messageContext = new MessageContext(event, MessageType.INTERMEDIATE_OUTPUTS, "INTERMEDIATE_OUTPUTS.EXP-" + UUID.randomUUID().toString(), gatewayId);
+ messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ experimentPublisher.publish(messageContext);
+ }
+
private void shareEntityWithAdminGatewayGroups(RegistryService.Client regClient, SharingRegistryService.Client sharingClient, Entity entity) throws TException {
final String domainId = entity.getDomainId();
GatewayGroups gatewayGroups = retrieveGatewayGroups(regClient, domainId);
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index 08cefbd..1760a11 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -363,6 +363,24 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
}
}
+ public String createAndSaveIntermediateOutputFetchingTasks(String gatewayId, ProcessModel processModel) throws OrchestratorException {
+ final RegistryService.Client registryClient = getRegistryServiceClient();
+ try {
+ List<String> taskIdList = new ArrayList<>();
+
+ taskIdList.addAll(createAndSaveIntermediateOutputDataStagingTasks(processModel, gatewayId));
+ // update process scheduling
+ registryClient.updateProcess(processModel, processModel.getProcessId());
+ return getTaskDag(taskIdList);
+ } catch (Exception e) {
+ throw new OrchestratorException("Error during creating process", e);
+ } finally {
+ if (registryClient != null) {
+ ThriftUtils.close(registryClient);
+ }
+ }
+ }
+
private String getTaskDag(List<String> taskIdList) {
if (taskIdList.isEmpty()) {
return "";
@@ -495,6 +513,48 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
return dataStagingTaskIds;
}
+ public List<String> createAndSaveIntermediateOutputDataStagingTasks(ProcessModel processModel, String gatewayId)
+ throws AiravataException, TException, OrchestratorException {
+
+ final RegistryService.Client registryClient = getRegistryServiceClient();
+ List<String> dataStagingTaskIds = new ArrayList<>();
+ try {
+ List<OutputDataObjectType> processOutputs = processModel.getProcessOutputs();
+ String appName = OrchestratorUtils.getApplicationInterfaceName(processModel);
+ if (processOutputs != null) {
+ for (OutputDataObjectType processOutput : processOutputs) {
+ DataType type = processOutput.getType();
+ switch (type) {
+ case STDOUT:
+ if (null == processOutput.getValue() || processOutput.getValue().trim().isEmpty()) {
+ processOutput.setValue(appName + ".stdout");
+ }
+ createIntermediateOutputDataStagingTasks(registryClient, processModel, gatewayId, dataStagingTaskIds, processOutput);
+ break;
+ case STDERR:
+ if (null == processOutput.getValue() || processOutput.getValue().trim().isEmpty()) {
+ processOutput.setValue(appName + ".stderr");
+ }
+ createIntermediateOutputDataStagingTasks(registryClient, processModel, gatewayId, dataStagingTaskIds, processOutput);
+ break;
+ case URI:
+ case URI_COLLECTION:
+ createIntermediateOutputDataStagingTasks(registryClient, processModel, gatewayId, dataStagingTaskIds, processOutput);
+ break;
+ default:
+ // nothing to do
+ break;
+ }
+ }
+ }
+
+ } finally {
+ if (registryClient != null) {
+ ThriftUtils.close(registryClient);
+ }
+ }
+ return dataStagingTaskIds;
+ }
private boolean isArchive(RegistryService.Client registryClient, ProcessModel processModel, OrchestratorContext orchestratorContext) throws TException {
ApplicationInterfaceDescription appInterface = registryClient
.getApplicationInterface(processModel.getApplicationInterfaceId());
@@ -532,6 +592,23 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
}
}
+ private void createIntermediateOutputDataStagingTasks(RegistryService.Client registryClient,
+ ProcessModel processModel,
+ String gatewayId,
+ List<String> dataStagingTaskIds,
+ OutputDataObjectType processOutput) throws AiravataException, OrchestratorException {
+ try {
+ TaskModel outputDataStagingTask = getOutputDataStagingTask(registryClient, processModel, processOutput, gatewayId);
+ outputDataStagingTask.setTaskType(TaskTypes.OUTPUT_FETCHING);
+ String taskId = registryClient
+ .addTask(outputDataStagingTask, processModel.getProcessId());
+ outputDataStagingTask.setTaskId(taskId);
+ dataStagingTaskIds.add(outputDataStagingTask.getTaskId());
+ } catch (TException e) {
+ throw new AiravataException("Error while serializing data staging sub task model", e);
+ }
+ }
+
private List<String> createAndSaveSubmissionTasks(RegistryService.Client registryClient, String gatewayId,
JobSubmissionInterface jobSubmissionInterface,
ProcessModel processModel,
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 97b67f1..2f32e3d 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -34,10 +34,12 @@ import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescr
import org.apache.airavata.model.appcatalog.groupresourceprofile.GroupComputeResourcePreference;
import org.apache.airavata.model.appcatalog.groupresourceprofile.GroupResourceProfile;
import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.commons.ErrorModel;
import org.apache.airavata.model.data.replica.DataProductModel;
import org.apache.airavata.model.data.replica.DataReplicaLocationModel;
import org.apache.airavata.model.data.replica.ReplicaLocationCategory;
+import org.apache.airavata.model.error.ExperimentNotFoundException;
import org.apache.airavata.model.error.LaunchValidationException;
import org.apache.airavata.model.experiment.ExperimentModel;
import org.apache.airavata.model.experiment.ExperimentType;
@@ -48,6 +50,7 @@ import org.apache.airavata.model.status.ExperimentState;
import org.apache.airavata.model.status.ExperimentStatus;
import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.model.util.ExperimentModelUtil;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
import org.apache.airavata.orchestrator.core.schedule.HostScheduler;
import org.apache.airavata.orchestrator.core.utils.OrchestratorConstants;
@@ -57,6 +60,7 @@ import org.apache.airavata.orchestrator.cpi.orchestrator_cpiConstants;
import org.apache.airavata.orchestrator.util.OrchestratorServerThreadPoolExecutor;
import org.apache.airavata.orchestrator.util.OrchestratorUtils;
import org.apache.airavata.registry.api.RegistryService;
+import org.apache.airavata.registry.api.RegistryService.Client;
import org.apache.airavata.registry.api.client.RegistryServiceClientFactory;
import org.apache.airavata.registry.api.exception.RegistryServiceException;
import org.apache.commons.lang.StringUtils;
@@ -346,6 +350,66 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
}
}
+ public void fetchIntermediateOutputs(String experimentId, String gatewayId, List<String> outputNames) throws TException {
+ final RegistryService.Client registryClient = getRegistryServiceClient();
+ try {
+ submitIntermediateOutputsProcess(registryClient, experimentId, gatewayId, outputNames);
+ } catch (Exception e) {
+ log.error("expId : " + experimentId + " :- Error while fetching intermediate", e);
+ } finally {
+ if (registryClient != null) {
+ ThriftUtils.close(registryClient);
+ }
+ }
+ }
+
+ private void submitIntermediateOutputsProcess(Client registryClient, String experimentId, String gatewayId, List<String> outputNames) throws Exception {
+
+ ExperimentModel experimentModel = registryClient.getExperiment(experimentId);
+ ProcessModel processModel = ExperimentModelUtil.cloneProcessFromExperiment(experimentModel);
+ processModel.setExperimentDataDir(processModel.getExperimentDataDir() + "/intermediates");
+ List<OutputDataObjectType> outputs = processModel.getProcessOutputs();
+ List<OutputDataObjectType> requestedOutputs = new ArrayList<>();
+ for (OutputDataObjectType output : outputs) {
+ if (outputNames.contains(output.getName())) {
+ requestedOutputs.add(output);
+ }
+ }
+ processModel.setProcessOutputs(requestedOutputs);
+ String processId = registryClient.addProcess(processModel, experimentId);
+ processModel.setProcessId(processId);
+ String taskDag = orchestrator.createAndSaveIntermediateOutputFetchingTasks(gatewayId, processModel);
+ processModel.setTaskDag(taskDag);
+
+ registryClient.updateProcess(processModel, processModel.getProcessId());
+
+ // Figure out the credential token
+ UserConfigurationDataModel userConfigurationData = experimentModel.getUserConfigurationData();
+ String token = null;
+ final String groupResourceProfileId = userConfigurationData.getGroupResourceProfileId();
+ if (groupResourceProfileId == null) {
+ throw new Exception("Experiment not configured with a Group Resource Profile: " + experimentId);
+ }
+ GroupComputeResourcePreference groupComputeResourcePreference = registryClient.getGroupComputeResourcePreference(
+ userConfigurationData.getComputationalResourceScheduling().getResourceHostId(),
+ groupResourceProfileId);
+ if (groupComputeResourcePreference.getResourceSpecificCredentialStoreToken() != null) {
+ token = groupComputeResourcePreference.getResourceSpecificCredentialStoreToken();
+ }
+ if (token == null || token.isEmpty()){
+ // try with group resource profile level token
+ GroupResourceProfile groupResourceProfile = registryClient.getGroupResourceProfile(groupResourceProfileId);
+ token = groupResourceProfile.getDefaultCredentialStoreToken();
+ }
+ // still the token is empty, then we fail the experiment
+ if (token == null || token.isEmpty()){
+ throw new Exception("You have not configured credential store token at group resource profile or compute resource preference." +
+ " Please provide the correct token at group resource profile or compute resource preference.");
+ }
+ // TODO: handle errors by updating the PROCESS status
+ orchestrator.launchProcess(processModel, token);
+ }
+
private String getAiravataUserName() {
return airavataUserName;
}
@@ -679,6 +743,9 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
case EXPERIMENT_CANCEL:
cancelExperiment(messageContext);
break;
+ case INTERMEDIATE_OUTPUTS:
+ handleIntermediateOutputsEvent(messageContext);
+ break;
default:
experimentSubscriber.sendAck(messageContext.getDeliveryTag());
log.error("Orchestrator got un-support message type : " + messageContext.getType());
@@ -702,6 +769,22 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
}
}
+
+ private void handleIntermediateOutputsEvent(MessageContext messageContext) {
+ try {
+ byte[] bytes = ThriftUtils.serializeThriftObject(messageContext.getEvent());
+ ExperimentIntermediateOutputsEvent event = new ExperimentIntermediateOutputsEvent();
+ ThriftUtils.createThriftFromBytes(bytes, event);
+ log.info("INTERMEDIATE_OUTPUTS event for experimentId: {} gateway Id: {} outputs: {}", event.getExperimentId(), event.getGatewayId(), event.getOutputNames());
+ fetchIntermediateOutputs(event.getExperimentId(), event.getGatewayId(), event.getOutputNames());
+ } catch (TException e) {
+ log.error("Error while fetching intermediate outputs", e);
+ throw new RuntimeException("Error while fetching intermediate outputs", e);
+ }finally {
+ experimentSubscriber.sendAck(messageContext.getDeliveryTag());
+ }
+
+ }
}
private void launchExperiment(MessageContext messageContext) {