You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ma...@apache.org on 2021/12/15 21:54:15 UTC

[airavata] branch AIRAVATA-3549 updated: AIRAVATA-3549 Initial API/Orch changes for fetching intermediate outputs

This is an automated email from the ASF dual-hosted git repository.

machristie pushed a commit to branch AIRAVATA-3549
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/AIRAVATA-3549 by this push:
     new 5ef1a8a  AIRAVATA-3549 Initial API/Orch changes for fetching intermediate outputs
5ef1a8a is described below

commit 5ef1a8a730e0e0f5fcc78be4ade0eff400152b8c
Author: Marcus Christie <ma...@apache.org>
AuthorDate: Wed Dec 15 16:54:03 2021 -0500

    AIRAVATA-3549 Initial API/Orch changes for fetching intermediate outputs
---
 .../api/server/handler/AiravataServerHandler.java  | 17 ++++-
 .../cpi/impl/SimpleOrchestratorImpl.java           | 77 ++++++++++++++++++++
 .../server/OrchestratorServerHandler.java          | 83 ++++++++++++++++++++++
 3 files changed, 175 insertions(+), 2 deletions(-)

diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
index 088c909..924de01 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
@@ -71,6 +71,7 @@ import org.apache.airavata.model.experiment.*;
 import org.apache.airavata.model.group.ResourcePermissionType;
 import org.apache.airavata.model.group.ResourceType;
 import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.messaging.event.ExperimentIntermediateOutputsEvent;
 import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
 import org.apache.airavata.model.messaging.event.ExperimentSubmitEvent;
 import org.apache.airavata.model.messaging.event.MessageType;
@@ -1944,8 +1945,13 @@ public class AiravataServerHandler implements Airavata.Iface {
     public void fetchIntermediateOutputs(AuthzToken authzToken, String airavataExperimentId, List<String> outputNames)
             throws InvalidRequestException, ExperimentNotFoundException, AiravataClientException,
             AiravataSystemException, AuthorizationException, TException {
-        // TODO Publish message to experiment channel with MessageType.INTERMEDIATE_OUTPUTS
-        // TODO create an event of type ExperimentIntermediateOutputsEvent
+        // TODO: Verify user has access to experiment
+        String gatewayId = authzToken.getClaimsMap().get(Constants.GATEWAY_ID);
+        try {
+            submitExperimentIntermediateOutputsEvent(gatewayId, airavataExperimentId, outputNames);
+        } catch (AiravataException e) {
+            throw new RuntimeException("Failed to submit intermediate outputs event", e);
+        }
     }
 
     @SecurityCheck
@@ -6180,6 +6186,13 @@ public class AiravataServerHandler implements Airavata.Iface {
         experimentPublisher.publish(messageContext);
     }
 
+    private void submitExperimentIntermediateOutputsEvent(String gatewayId, String experimentId, List<String> outputNames) throws AiravataException {
+        ExperimentIntermediateOutputsEvent event = new ExperimentIntermediateOutputsEvent(experimentId, gatewayId, outputNames);
+        MessageContext messageContext = new MessageContext(event, MessageType.INTERMEDIATE_OUTPUTS, "INTERMEDIATE_OUTPUTS.EXP-" + UUID.randomUUID().toString(), gatewayId);
+        messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+        experimentPublisher.publish(messageContext);
+    }
+
     private void shareEntityWithAdminGatewayGroups(RegistryService.Client regClient, SharingRegistryService.Client sharingClient, Entity entity) throws TException {
         final String domainId = entity.getDomainId();
         GatewayGroups gatewayGroups = retrieveGatewayGroups(regClient, domainId);
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index 08cefbd..1760a11 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -363,6 +363,24 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
         }
     }
 
+    public String createAndSaveIntermediateOutputFetchingTasks(String gatewayId, ProcessModel processModel) throws OrchestratorException {
+        final RegistryService.Client registryClient = getRegistryServiceClient();
+        try {
+            List<String> taskIdList = new ArrayList<>();
+
+            taskIdList.addAll(createAndSaveIntermediateOutputDataStagingTasks(processModel, gatewayId));
+            // update process scheduling
+            registryClient.updateProcess(processModel, processModel.getProcessId());
+            return getTaskDag(taskIdList);
+        } catch (Exception e) {
+            throw new OrchestratorException("Error during creating process", e);
+        } finally {
+            if (registryClient != null) {
+                ThriftUtils.close(registryClient);
+            }
+        }
+    }
+
     private String getTaskDag(List<String> taskIdList) {
         if (taskIdList.isEmpty()) {
             return "";
@@ -495,6 +513,48 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
         return dataStagingTaskIds;
     }
 
+    public List<String> createAndSaveIntermediateOutputDataStagingTasks(ProcessModel processModel, String gatewayId)
+            throws AiravataException, TException, OrchestratorException {
+
+        final RegistryService.Client registryClient = getRegistryServiceClient();
+        List<String> dataStagingTaskIds = new ArrayList<>();
+        try {
+            List<OutputDataObjectType> processOutputs = processModel.getProcessOutputs();
+            String appName = OrchestratorUtils.getApplicationInterfaceName(processModel);
+            if (processOutputs != null) {
+                for (OutputDataObjectType processOutput : processOutputs) {
+                    DataType type = processOutput.getType();
+                    switch (type) {
+                        case STDOUT:
+                            if (null == processOutput.getValue() || processOutput.getValue().trim().isEmpty()) {
+                                processOutput.setValue(appName + ".stdout");
+                            }
+                            createIntermediateOutputDataStagingTasks(registryClient, processModel, gatewayId, dataStagingTaskIds, processOutput);
+                            break;
+                        case STDERR:
+                            if (null == processOutput.getValue() || processOutput.getValue().trim().isEmpty()) {
+                                processOutput.setValue(appName + ".stderr");
+                            }
+                            createIntermediateOutputDataStagingTasks(registryClient, processModel, gatewayId, dataStagingTaskIds, processOutput);
+                            break;
+                        case URI:
+                        case URI_COLLECTION:
+                            createIntermediateOutputDataStagingTasks(registryClient, processModel, gatewayId, dataStagingTaskIds, processOutput);
+                            break;
+                        default:
+                            // nothing to do
+                            break;
+                    }
+                }
+            }
+
+        } finally {
+            if (registryClient != null) {
+                ThriftUtils.close(registryClient);
+            }
+        }
+        return dataStagingTaskIds;
+    }
     private boolean isArchive(RegistryService.Client registryClient, ProcessModel processModel, OrchestratorContext orchestratorContext) throws TException {
         ApplicationInterfaceDescription appInterface = registryClient
                 .getApplicationInterface(processModel.getApplicationInterfaceId());
@@ -532,6 +592,23 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
         }
     }
 
+    private void createIntermediateOutputDataStagingTasks(RegistryService.Client registryClient,
+            ProcessModel processModel,
+            String gatewayId,
+            List<String> dataStagingTaskIds,
+            OutputDataObjectType processOutput) throws AiravataException, OrchestratorException {
+        try {
+            TaskModel outputDataStagingTask = getOutputDataStagingTask(registryClient, processModel, processOutput, gatewayId);
+            outputDataStagingTask.setTaskType(TaskTypes.OUTPUT_FETCHING);
+            String taskId = registryClient
+                    .addTask(outputDataStagingTask, processModel.getProcessId());
+            outputDataStagingTask.setTaskId(taskId);
+            dataStagingTaskIds.add(outputDataStagingTask.getTaskId());
+        } catch (TException e) {
+            throw new AiravataException("Error while serializing data staging sub task model", e);
+        }
+    }
+
     private List<String> createAndSaveSubmissionTasks(RegistryService.Client registryClient, String gatewayId,
                                                       JobSubmissionInterface jobSubmissionInterface,
                                                       ProcessModel processModel,
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 97b67f1..2f32e3d 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -34,10 +34,12 @@ import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescr
 import org.apache.airavata.model.appcatalog.groupresourceprofile.GroupComputeResourcePreference;
 import org.apache.airavata.model.appcatalog.groupresourceprofile.GroupResourceProfile;
 import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.data.replica.DataProductModel;
 import org.apache.airavata.model.data.replica.DataReplicaLocationModel;
 import org.apache.airavata.model.data.replica.ReplicaLocationCategory;
+import org.apache.airavata.model.error.ExperimentNotFoundException;
 import org.apache.airavata.model.error.LaunchValidationException;
 import org.apache.airavata.model.experiment.ExperimentModel;
 import org.apache.airavata.model.experiment.ExperimentType;
@@ -48,6 +50,7 @@ import org.apache.airavata.model.status.ExperimentState;
 import org.apache.airavata.model.status.ExperimentStatus;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.model.util.ExperimentModelUtil;
 import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
 import org.apache.airavata.orchestrator.core.schedule.HostScheduler;
 import org.apache.airavata.orchestrator.core.utils.OrchestratorConstants;
@@ -57,6 +60,7 @@ import org.apache.airavata.orchestrator.cpi.orchestrator_cpiConstants;
 import org.apache.airavata.orchestrator.util.OrchestratorServerThreadPoolExecutor;
 import org.apache.airavata.orchestrator.util.OrchestratorUtils;
 import org.apache.airavata.registry.api.RegistryService;
+import org.apache.airavata.registry.api.RegistryService.Client;
 import org.apache.airavata.registry.api.client.RegistryServiceClientFactory;
 import org.apache.airavata.registry.api.exception.RegistryServiceException;
 import org.apache.commons.lang.StringUtils;
@@ -346,6 +350,66 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 		}
 	}
 
+	public void fetchIntermediateOutputs(String experimentId, String gatewayId, List<String> outputNames) throws TException {
+		final RegistryService.Client registryClient = getRegistryServiceClient();
+		try {
+			submitIntermediateOutputsProcess(registryClient, experimentId, gatewayId, outputNames);
+		} catch (Exception e) {
+			log.error("expId : " + experimentId + " :- Error while fetching intermediate", e);
+		} finally {
+			if (registryClient != null) {
+				ThriftUtils.close(registryClient);
+			}
+		}
+	}
+
+	private void submitIntermediateOutputsProcess(Client registryClient, String experimentId, String gatewayId, List<String> outputNames) throws Exception {
+
+		ExperimentModel experimentModel = registryClient.getExperiment(experimentId);
+		ProcessModel processModel = ExperimentModelUtil.cloneProcessFromExperiment(experimentModel);
+		processModel.setExperimentDataDir(processModel.getExperimentDataDir() + "/intermediates");
+		List<OutputDataObjectType> outputs = processModel.getProcessOutputs();
+		List<OutputDataObjectType> requestedOutputs = new ArrayList<>();
+		for (OutputDataObjectType output : outputs) {
+			if (outputNames.contains(output.getName())) {
+				requestedOutputs.add(output);
+			}
+		}
+		processModel.setProcessOutputs(requestedOutputs);
+		String processId = registryClient.addProcess(processModel, experimentId);
+		processModel.setProcessId(processId);
+		String taskDag = orchestrator.createAndSaveIntermediateOutputFetchingTasks(gatewayId, processModel);
+		processModel.setTaskDag(taskDag);
+
+		registryClient.updateProcess(processModel, processModel.getProcessId());
+
+		// Figure out the credential token
+		UserConfigurationDataModel userConfigurationData = experimentModel.getUserConfigurationData();
+		String token = null;
+		final String groupResourceProfileId = userConfigurationData.getGroupResourceProfileId();
+		if (groupResourceProfileId == null) {
+			throw new Exception("Experiment not configured with a Group Resource Profile: " + experimentId);
+		}
+		GroupComputeResourcePreference groupComputeResourcePreference = registryClient.getGroupComputeResourcePreference(
+				userConfigurationData.getComputationalResourceScheduling().getResourceHostId(),
+				groupResourceProfileId);
+		if (groupComputeResourcePreference.getResourceSpecificCredentialStoreToken() != null) {
+			token = groupComputeResourcePreference.getResourceSpecificCredentialStoreToken();
+		}
+		if (token == null || token.isEmpty()){
+			// try with group resource profile level token
+			GroupResourceProfile groupResourceProfile = registryClient.getGroupResourceProfile(groupResourceProfileId);
+			token = groupResourceProfile.getDefaultCredentialStoreToken();
+		}
+		// still the token is empty, then we fail the experiment
+		if (token == null || token.isEmpty()){
+			throw new Exception("You have not configured credential store token at group resource profile or compute resource preference." +
+					" Please provide the correct token at group resource profile or compute resource preference.");
+		}
+		// TODO: handle errors by updating the PROCESS status
+		orchestrator.launchProcess(processModel, token);
+	}
+
 	private String getAiravataUserName() {
 		return airavataUserName;
 	}
@@ -679,6 +743,9 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 				case EXPERIMENT_CANCEL:
                     cancelExperiment(messageContext);
 					break;
+				case INTERMEDIATE_OUTPUTS:
+					handleIntermediateOutputsEvent(messageContext);
+					break;
 				default:
 					experimentSubscriber.sendAck(messageContext.getDeliveryTag());
 					log.error("Orchestrator got un-support message type : " + messageContext.getType());
@@ -702,6 +769,22 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 			}
 
 		}
+
+		private void handleIntermediateOutputsEvent(MessageContext messageContext) {
+			try {
+				byte[] bytes = ThriftUtils.serializeThriftObject(messageContext.getEvent());
+				ExperimentIntermediateOutputsEvent event =  new ExperimentIntermediateOutputsEvent();
+				ThriftUtils.createThriftFromBytes(bytes, event);
+				log.info("INTERMEDIATE_OUTPUTS event for experimentId: {} gateway Id: {} outputs: {}", event.getExperimentId(), event.getGatewayId(), event.getOutputNames());
+				fetchIntermediateOutputs(event.getExperimentId(), event.getGatewayId(), event.getOutputNames());
+			} catch (TException e) {
+				log.error("Error while fetching intermediate outputs", e);
+				throw new RuntimeException("Error while fetching intermediate outputs", e);
+			}finally {
+				experimentSubscriber.sendAck(messageContext.getDeliveryTag());
+			}
+
+		}
 	}
 
 	private void launchExperiment(MessageContext messageContext) {