You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/06/02 22:41:07 UTC
[11/25] airavata git commit: rename airavata-jpa-registry module to
experiment-catalog
http://git-wip-us.apache.org/repos/asf/airavata/blob/22bcbb40/modules/registry/experiment-catalog/src/main/java/org/apache/airavata/experiment/catalog/impl/ExperimentRegistry.java
----------------------------------------------------------------------
diff --git a/modules/registry/experiment-catalog/src/main/java/org/apache/airavata/experiment/catalog/impl/ExperimentRegistry.java b/modules/registry/experiment-catalog/src/main/java/org/apache/airavata/experiment/catalog/impl/ExperimentRegistry.java
new file mode 100644
index 0000000..bd00a1e
--- /dev/null
+++ b/modules/registry/experiment-catalog/src/main/java/org/apache/airavata/experiment/catalog/impl/ExperimentRegistry.java
@@ -0,0 +1,2983 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.experiment.catalog.impl;
+
+import org.apache.airavata.common.logger.AiravataLogger;
+import org.apache.airavata.common.logger.AiravataLoggerFactory;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.experiment.catalog.Resource;
+import org.apache.airavata.experiment.catalog.ResourceType;
+import org.apache.airavata.experiment.catalog.ResourceUtils;
+import org.apache.airavata.experiment.catalog.resources.*;
+import org.apache.airavata.experiment.catalog.utils.ThriftDataModelConversion;
+import org.apache.airavata.registry.cpi.CompositeIdentifier;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.airavata.registry.cpi.ResultOrderType;
+import org.apache.airavata.registry.cpi.utils.Constants;
+import org.apache.airavata.registry.cpi.utils.StatusType;
+
+import java.sql.Timestamp;
+import java.util.*;
+
+public class ExperimentRegistry {
+ private GatewayResource gatewayResource;
+ private WorkerResource workerResource;
+ private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(ExperimentRegistry.class);
+
+ public ExperimentRegistry(GatewayResource gateway, UserResource user) throws RegistryException {
+ gatewayResource = gateway;
+ if (!gatewayResource.isExists(ResourceType.GATEWAY_WORKER, user.getUserName())) {
+ workerResource = ResourceUtils.addGatewayWorker(gateway, user);
+ } else {
+ workerResource = (WorkerResource) ResourceUtils.getWorker(gateway.getGatewayId(), user.getUserName());
+ }
+
+ }
+
+ public String addExperiment(Experiment experiment, String gatewayId) throws RegistryException {
+ String experimentID;
+ try {
+ if (!ResourceUtils.isUserExist(experiment.getUserName())) {
+ ResourceUtils.addUser(experiment.getUserName(), null);
+ }
+
+ experimentID = getExperimentID(experiment.getName());
+ experiment.setExperimentID(experimentID);
+ ExperimentResource experimentResource = new ExperimentResource();
+ experimentResource.setExpID(experimentID);
+ experimentResource.setExpName(experiment.getName());
+ experimentResource.setExecutionUser(experiment.getUserName());
+ experimentResource.setGatewayId(gatewayId);
+ experimentResource.setGatewayExecutionId(experiment.getGatewayExecutionId());
+ experimentResource.setEnableEmailNotifications(experiment.isEnableEmailNotification());
+ if (!workerResource.isProjectExists(experiment.getProjectID())) {
+ logger.error("Project does not exist in the system..");
+ throw new Exception("Project does not exist in the system, Please create the project first...");
+ }
+ experimentResource.setProjectId(experiment.getProjectID());
+ experimentResource.setCreationTime(AiravataUtils.getTime(experiment.getCreationTime()));
+ experimentResource.setDescription(experiment.getDescription());
+ experimentResource.setApplicationId(experiment.getApplicationId());
+ experimentResource.setApplicationVersion(experiment.getApplicationVersion());
+ experimentResource.setWorkflowTemplateId(experiment.getWorkflowTemplateId());
+ experimentResource.setWorkflowTemplateVersion(experiment.getWorkflowTemplateVersion());
+ experimentResource.setWorkflowExecutionId(experiment.getWorkflowExecutionInstanceId());
+ experimentResource.save();
+
+ List<String> emailAddresses = experiment.getEmailAddresses();
+ if (emailAddresses != null && !emailAddresses.isEmpty()){
+ for (String email : emailAddresses){
+ NotificationEmailResource emailResource = new NotificationEmailResource();
+ emailResource.setExperimentId(experimentID);
+ emailResource.setEmailAddress(email);
+ emailResource.save();
+ }
+ }
+
+ List<InputDataObjectType> experimentInputs = experiment.getExperimentInputs();
+ if (experimentInputs != null) {
+ addExpInputs(experimentInputs, experimentResource);
+ }
+
+ UserConfigurationData userConfigurationData = experiment.getUserConfigurationData();
+ if (userConfigurationData != null) {
+ addUserConfigData(userConfigurationData, experimentID);
+ }
+
+ List<OutputDataObjectType> experimentOutputs = experiment.getExperimentOutputs();
+ if (experimentOutputs != null && !experimentOutputs.isEmpty()) {
+ //TODO: short change.
+// for (DataObjectType output : experimentOutputs){
+// output.setValue("");
+// }
+ addExpOutputs(experimentOutputs, experimentID);
+ }
+
+// ExperimentStatus experimentStatus = experiment.getExperimentStatus();
+// if (experimentStatus != null){
+// updateExperimentStatus(experimentStatus, experimentID);
+// }else {
+ ExperimentStatus experimentStatus = new ExperimentStatus();
+ experimentStatus.setExperimentState(ExperimentState.CREATED);
+ updateExperimentStatus(experimentStatus, experimentID);
+// }
+
+ List<WorkflowNodeDetails> workflowNodeDetailsList = experiment.getWorkflowNodeDetailsList();
+ if (workflowNodeDetailsList != null && !workflowNodeDetailsList.isEmpty()) {
+ for (WorkflowNodeDetails wf : workflowNodeDetailsList) {
+ addWorkflowNodeDetails(wf, experimentID);
+ }
+ }
+ List<ErrorDetails> errors = experiment.getErrors();
+ if (errors != null && !errors.isEmpty()) {
+ for (ErrorDetails errror : errors) {
+ addErrorDetails(errror, experimentID);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Error while saving experiment to registry", e);
+ throw new RegistryException(e);
+ }
+ return experimentID;
+ }
+
+ public String addUserConfigData(UserConfigurationData configurationData, String experimentID) throws RegistryException {
+ try {
+ ExperimentResource experiment = gatewayResource.getExperiment(experimentID);
+ ConfigDataResource configData = (ConfigDataResource) experiment.create(ResourceType.CONFIG_DATA);
+ configData.setExperimentId(experimentID);
+ configData.setAiravataAutoSchedule(configurationData.isAiravataAutoSchedule());
+ configData.setOverrideManualParams(configurationData.isOverrideManualScheduledParams());
+ configData.setShareExp(configurationData.isShareExperimentPublicly());
+ configData.setUserDn(configurationData.getUserDN());
+ configData.setGenerateCert(configurationData.isGenerateCert());
+ configData.save();
+ ComputationalResourceScheduling resourceScheduling = configurationData.getComputationalResourceScheduling();
+ if (resourceScheduling != null) {
+ addComputationScheduling(resourceScheduling, experiment);
+ }
+ AdvancedInputDataHandling inputDataHandling = configurationData.getAdvanceInputDataHandling();
+ if (inputDataHandling != null) {
+ addInputDataHandling(inputDataHandling, experiment);
+ }
+
+ AdvancedOutputDataHandling outputDataHandling = configurationData.getAdvanceOutputDataHandling();
+ if (outputDataHandling != null) {
+ addOutputDataHandling(outputDataHandling, experiment);
+ }
+
+ QualityOfServiceParams qosParams = configurationData.getQosParams();
+ if (qosParams != null) {
+ addQosParams(qosParams, experiment);
+ }
+ } catch (Exception e) {
+ logger.error("Unable to save user config data", e);
+ throw new RegistryException(e);
+ }
+ return experimentID;
+ }
+
+ public void addQosParams(QualityOfServiceParams qosParams, Resource resource) throws RegistryException {
+ try {
+ QosParamResource qosr = new QosParamResource();
+ if (resource instanceof ExperimentResource) {
+ ExperimentResource experiment = (ExperimentResource) resource;
+ qosr.setExperimentId(experiment.getExpID());
+ }
+ if (resource instanceof TaskDetailResource) {
+ TaskDetailResource taskDetailResource = (TaskDetailResource) resource;
+ qosr.setTaskId(taskDetailResource.getTaskId());
+ String nodeId = taskDetailResource.getNodeId();
+ ExperimentResource experimentResource = new ExperimentResource();
+ WorkflowNodeDetailResource workflowNode = experimentResource.getWorkflowNode(nodeId);
+ qosr.setExperimentId(workflowNode.getExperimentId());
+ }
+ qosr.setStartExecutionAt(qosParams.getStartExecutionAt());
+ qosr.setExecuteBefore(qosParams.getExecuteBefore());
+ qosr.setNoOfRetries(qosParams.getNumberofRetries());
+ qosr.save();
+ } catch (Exception e) {
+ logger.error("Unable to save QOS params", e);
+ throw new RegistryException(e);
+ }
+
+ }
+
+ public void addOutputDataHandling(AdvancedOutputDataHandling outputDataHandling, Resource resource) throws RegistryException {
+ AdvancedOutputDataHandlingResource adodh = new AdvancedOutputDataHandlingResource();
+ try {
+ if (resource instanceof ExperimentResource) {
+ ExperimentResource experiment = (ExperimentResource) resource;
+ adodh.setExperimentId(experiment.getExpID());
+ }
+ if (resource instanceof TaskDetailResource) {
+ TaskDetailResource taskDetailResource = (TaskDetailResource) resource;
+ String nodeId = taskDetailResource.getNodeId();
+ ExperimentResource experimentResource = new ExperimentResource();
+ WorkflowNodeDetailResource workflowNode = experimentResource.getWorkflowNode(nodeId);
+ adodh.setExperimentId(workflowNode.getExperimentId());
+ adodh.setTaskId(taskDetailResource.getTaskId());
+ }
+ adodh.setOutputDataDir(outputDataHandling.getOutputDataDir());
+ adodh.setDataRegUrl(outputDataHandling.getDataRegistryURL());
+ adodh.setPersistOutputData(outputDataHandling.isPersistOutputData());
+ adodh.save();
+ } catch (Exception e) {
+ logger.error("Unable to save output data handling data", e);
+ throw new RegistryException(e);
+ }
+
+ }
+
+ public void addInputDataHandling(AdvancedInputDataHandling inputDataHandling, Resource resource) throws RegistryException {
+ AdvanceInputDataHandlingResource adidh = new AdvanceInputDataHandlingResource();
+ try {
+ if (resource instanceof ExperimentResource) {
+ ExperimentResource experiment = (ExperimentResource) resource;
+ adidh.setExperimentId(experiment.getExpID());
+ }
+ if (resource instanceof TaskDetailResource) {
+ TaskDetailResource taskDetailResource = (TaskDetailResource) resource;
+ String nodeId = taskDetailResource.getNodeId();
+ ExperimentResource experimentResource = new ExperimentResource();
+ WorkflowNodeDetailResource workflowNode = experimentResource.getWorkflowNode(nodeId);
+ adidh.setExperimentId(workflowNode.getExperimentId());
+ adidh.setTaskId(taskDetailResource.getTaskId());
+ }
+ adidh.setWorkingDir(inputDataHandling.getUniqueWorkingDirectory());
+ adidh.setWorkingDirParent(inputDataHandling.getParentWorkingDirectory());
+ adidh.setStageInputFiles(inputDataHandling.isSetStageInputFilesToWorkingDir());
+ adidh.setCleanAfterJob(inputDataHandling.isCleanUpWorkingDirAfterJob());
+ adidh.save();
+ } catch (Exception e) {
+ logger.error("Unable to save input data handling data", e);
+ throw new RegistryException(e);
+ }
+
+ }
+
+ public void addComputationScheduling(ComputationalResourceScheduling resourceScheduling, Resource resource) throws RegistryException {
+ ComputationSchedulingResource cmsr = new ComputationSchedulingResource();
+ try {
+ if (resource instanceof ExperimentResource) {
+ ExperimentResource experiment = (ExperimentResource) resource;
+ cmsr.setExperimentId(experiment.getExpID());
+ }
+ if (resource instanceof TaskDetailResource) {
+ TaskDetailResource taskDetailResource = (TaskDetailResource) resource;
+ String nodeId = taskDetailResource.getNodeId();
+ ExperimentResource experimentResource = new ExperimentResource();
+ WorkflowNodeDetailResource workflowNode = experimentResource.getWorkflowNode(nodeId);
+ cmsr.setExperimentId(workflowNode.getExperimentId());
+ cmsr.setTaskId(taskDetailResource.getTaskId());
+ }
+ cmsr.setResourceHostId(resourceScheduling.getResourceHostId());
+ cmsr.setCpuCount(resourceScheduling.getTotalCPUCount());
+ cmsr.setNodeCount(resourceScheduling.getNodeCount());
+ cmsr.setNumberOfThreads(resourceScheduling.getNumberOfThreads());
+ cmsr.setQueueName(resourceScheduling.getQueueName());
+ cmsr.setWalltimeLimit(resourceScheduling.getWallTimeLimit());
+ cmsr.setJobStartTime(AiravataUtils.getTime(resourceScheduling.getJobStartTime()));
+ cmsr.setPhysicalMemory(resourceScheduling.getTotalPhysicalMemory());
+ cmsr.setProjectName(resourceScheduling.getComputationalProjectAccount());
+ cmsr.save();
+ } catch (Exception e) {
+ logger.error("Unable to save computational scheduling data", e);
+ throw new RegistryException(e);
+ }
+
+ }
+
+ public void addExpInputs(List<InputDataObjectType> exInputs, ExperimentResource experimentResource) throws RegistryException {
+ try {
+ for (InputDataObjectType input : exInputs) {
+ ExperimentInputResource resource = (ExperimentInputResource) experimentResource.create(ResourceType.EXPERIMENT_INPUT);
+ resource.setExperimentId(experimentResource.getExpID());
+ resource.setExperimentKey(input.getName());
+ resource.setValue(input.getValue());
+ if (input.getType() != null) {
+ resource.setDataType(input.getType().toString());
+ }
+ resource.setMetadata(input.getMetaData());
+ resource.setAppArgument(input.getApplicationArgument());
+ resource.setInputOrder(input.getInputOrder());
+ resource.setRequired(input.isIsRequired());
+ resource.setRequiredToCMD(input.isRequiredToAddedToCommandLine());
+ resource.save();
+ }
+ } catch (Exception e) {
+ logger.error("Unable to save experiment inputs", e);
+ throw new RegistryException(e);
+ }
+ }
+
+ public void updateExpInputs(List<InputDataObjectType> exInputs, ExperimentResource experimentResource) throws RegistryException {
+ try {
+ List<ExperimentInputResource> experimentInputs = experimentResource.getExperimentInputs();
+ for (InputDataObjectType input : exInputs) {
+ for (ExperimentInputResource exinput : experimentInputs) {
+ if (exinput.getExperimentKey().equals(input.getName())) {
+ exinput.setValue(input.getValue());
+ if (input.getType() != null) {
+ exinput.setDataType(input.getType().toString());
+ }
+ exinput.setMetadata(input.getMetaData());
+ exinput.setAppArgument(input.getApplicationArgument());
+ exinput.setInputOrder(input.getInputOrder());
+ exinput.setRequired(input.isIsRequired());
+ exinput.setRequiredToCMD(input.isRequiredToAddedToCommandLine());
+ exinput.save();
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Unable to update experiment inputs", e);
+ throw new RegistryException(e);
+ }
+
+ }
+
+ public String addExpOutputs(List<OutputDataObjectType> exOutput, String expId) throws RegistryException {
+ try {
+ ExperimentResource experiment = gatewayResource.getExperiment(expId);
+ for (OutputDataObjectType output : exOutput) {
+ ExperimentOutputResource resource = (ExperimentOutputResource) experiment.create(ResourceType.EXPERIMENT_OUTPUT);
+ resource.setExperimentId(expId);
+ resource.setExperimentKey(output.getName());
+ resource.setValue(output.getValue());
+ if (output.getType() != null) {
+ resource.setDataType(output.getType().toString());
+ }
+ resource.setRequired(output.isIsRequired());
+ resource.setRequiredToCMD(output.isRequiredToAddedToCommandLine());
+ resource.setDataMovement(output.isDataMovement());
+ resource.setDataNameLocation(output.getLocation());
+ resource.setAppArgument(output.getApplicationArgument());
+ resource.setSearchQuery(output.getSearchQuery());
+// resource.setMetadata(output.get());
+ resource.save();
+ }
+ } catch (Exception e) {
+ logger.error("Error while adding experiment outputs...", e);
+ throw new RegistryException(e);
+ }
+ return expId;
+ }
+
+ public void updateExpOutputs(List<OutputDataObjectType> exOutput, String expId) throws RegistryException {
+ try {
+ ExperimentResource experiment = gatewayResource.getExperiment(expId);
+ List<ExperimentOutputResource> existingExpOutputs = experiment.getExperimentOutputs();
+ for (OutputDataObjectType output : exOutput) {
+ for (ExperimentOutputResource resource : existingExpOutputs) {
+ if (resource.getExperimentKey().equals(output.getName())) {
+ resource.setExperimentId(expId);
+ resource.setExperimentKey(output.getName());
+ resource.setValue(output.getValue());
+ if (output.getType() != null) {
+ resource.setDataType(output.getType().toString());
+ }
+ resource.setRequired(output.isIsRequired());
+ resource.setRequiredToCMD(output.isRequiredToAddedToCommandLine());
+ resource.setDataMovement(output.isDataMovement());
+ resource.setDataNameLocation(output.getLocation());
+ resource.setAppArgument(output.getApplicationArgument());
+ resource.setSearchQuery(output.getSearchQuery());
+// resource.setMetadata(output.getMetaData());
+ resource.save();
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Error while updating experiment outputs", e);
+ throw new RegistryException(e);
+ }
+ }
+
+ public String addNodeOutputs(List<OutputDataObjectType> wfOutputs, CompositeIdentifier ids) throws RegistryException {
+ try {
+ ExperimentResource experiment = gatewayResource.getExperiment((String) ids.getTopLevelIdentifier());
+ WorkflowNodeDetailResource workflowNode = experiment.getWorkflowNode((String) ids.getSecondLevelIdentifier());
+ for (OutputDataObjectType output : wfOutputs) {
+ NodeOutputResource resource = (NodeOutputResource) workflowNode.create(ResourceType.NODE_OUTPUT);
+ resource.setNodeId(workflowNode.getNodeInstanceId());
+ resource.setOutputKey(output.getName());
+ resource.setValue(output.getValue());
+ if (output.getType() != null) {
+ resource.setDataType(output.getType().toString());
+ }
+ resource.setRequired(output.isIsRequired());
+ resource.setRequiredToCMD(output.isRequiredToAddedToCommandLine());
+ resource.setDataMovement(output.isDataMovement());
+ resource.setDataNameLocation(output.getLocation());
+ resource.setAppArgument(output.getApplicationArgument());
+ resource.setSearchQuery(output.getSearchQuery());
+// resource.setMetadata(output.getMetaData());
+ resource.save();
+ }
+ } catch (Exception e) {
+ logger.error("Error while adding node outputs...", e);
+ throw new RegistryException(e);
+ }
+ return (String) ids.getSecondLevelIdentifier();
+ }
+
+ public void updateNodeOutputs(List<OutputDataObjectType> wfOutputs, String nodeId) throws RegistryException {
+ try {
+ ExperimentResource experiment = (ExperimentResource) gatewayResource.create(ResourceType.EXPERIMENT);
+ WorkflowNodeDetailResource workflowNode = experiment.getWorkflowNode(nodeId);
+ List<NodeOutputResource> nodeOutputs = workflowNode.getNodeOutputs();
+ for (OutputDataObjectType output : wfOutputs) {
+ for (NodeOutputResource resource : nodeOutputs) {
+ resource.setNodeId(workflowNode.getNodeInstanceId());
+ resource.setOutputKey(output.getName());
+ resource.setValue(output.getValue());
+ if (output.getType() != null) {
+ resource.setDataType(output.getType().toString());
+ }
+ resource.setRequired(output.isIsRequired());
+ resource.setRequiredToCMD(output.isRequiredToAddedToCommandLine());
+ resource.setDataMovement(output.isDataMovement());
+ resource.setDataNameLocation(output.getLocation());
+ resource.setAppArgument(output.getApplicationArgument());
+ resource.setSearchQuery(output.getSearchQuery());
+// resource.setMetadata(output.getMetaData());
+ resource.save();
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Error while updating node outputs...", e);
+ throw new RegistryException(e);
+ }
+ }
+
+ public String addApplicationOutputs(List<OutputDataObjectType> appOutputs, CompositeIdentifier ids) throws RegistryException {
+ try {
+ ExperimentResource experiment = (ExperimentResource) gatewayResource.create(ResourceType.EXPERIMENT);
+ WorkflowNodeDetailResource workflowNode = experiment.getWorkflowNode((String) ids.getTopLevelIdentifier());
+ TaskDetailResource taskDetail = workflowNode.getTaskDetail((String) ids.getSecondLevelIdentifier());
+ for (OutputDataObjectType output : appOutputs) {
+ ApplicationOutputResource resource = (ApplicationOutputResource) taskDetail.create(ResourceType.APPLICATION_OUTPUT);
+ resource.setTaskId(taskDetail.getTaskId());
+ resource.setOutputKey(output.getName());
+ resource.setValue(output.getValue());
+ if (output.getType() != null) {
+ resource.setDataType(output.getType().toString());
+ }
+ resource.setRequired(output.isIsRequired());
+ resource.setRequiredToCMD(output.isRequiredToAddedToCommandLine());
+ resource.setDataMovement(output.isDataMovement());
+ resource.setDataNameLocation(output.getLocation());
+ resource.setAppArgument(output.getApplicationArgument());
+ resource.setSearchQuery(output.getSearchQuery());
+// resource.setMetadata(output.getMetaData());
+ resource.save();
+ }
+ } catch (Exception e) {
+ logger.error("Error while adding application outputs...", e);
+ throw new RegistryException(e);
+ }
+ return (String) ids.getSecondLevelIdentifier();
+ }
+
+ public String updateExperimentStatus(ExperimentStatus experimentStatus, String expId) throws RegistryException {
+ try {
+ ExperimentResource experiment = gatewayResource.getExperiment(expId);
+ StatusResource status = experiment.getExperimentStatus();
+ if (status == null) {
+ status = (StatusResource) experiment.create(ResourceType.STATUS);
+ }
+ status.setExperimentId(expId);
+ status.setStatusUpdateTime(AiravataUtils.getTime(experimentStatus.getTimeOfStateChange()));
+ if (status.getState() == null) {
+ status.setState(ExperimentState.UNKNOWN.name());
+ }
+ if (isValidStatusTransition(ExperimentState.valueOf(status.getState()), experimentStatus.getExperimentState())) {
+ status.setState(experimentStatus.getExperimentState().toString());
+ status.setStatusType(StatusType.EXPERIMENT.toString());
+ status.save();
+ logger.debugId(expId, "Updated experiment {} status to {}.", expId, experimentStatus.toString());
+ }
+ } catch (Exception e) {
+ logger.errorId(expId, "Error while updating experiment status...", e);
+ throw new RegistryException(e);
+ }
+ return expId;
+ }
+
+ public String addWorkflowNodeStatus(WorkflowNodeStatus status, CompositeIdentifier ids) throws RegistryException {
+ try {
+ ExperimentResource experiment = gatewayResource.getExperiment((String) ids.getTopLevelIdentifier());
+ WorkflowNodeDetailResource workflowNode = experiment.getWorkflowNode((String) ids.getSecondLevelIdentifier());
+ StatusResource statusResource = (StatusResource) experiment.create(ResourceType.STATUS);
+ statusResource.setExperimentId(experiment.getExpID());
+ statusResource.setNodeId(workflowNode.getNodeInstanceId());
+ statusResource.setStatusType(StatusType.WORKFLOW_NODE.toString());
+ statusResource.setStatusUpdateTime(AiravataUtils.getTime(status.getTimeOfStateChange()));
+ if (status.getWorkflowNodeState() == null) {
+ statusResource.setState(WorkflowNodeState.UNKNOWN.toString());
+ } else {
+ statusResource.setState(status.getWorkflowNodeState().toString());
+ }
+ statusResource.save();
+ return String.valueOf(statusResource.getStatusId());
+ } catch (Exception e) {
+ logger.error("Error while adding workflow node status...", e);
+ throw new RegistryException(e);
+ }
+ }
+
+ public String updateWorkflowNodeStatus(WorkflowNodeStatus status, String nodeId) throws RegistryException {
+ try {
+ ExperimentResource experiment = (ExperimentResource) gatewayResource.create(ResourceType.EXPERIMENT);
+ WorkflowNodeDetailResource workflowNode = experiment.getWorkflowNode(nodeId);
+ StatusResource statusResource = workflowNode.getWorkflowNodeStatus();
+ if (statusResource == null) {
+ statusResource = (StatusResource) workflowNode.create(ResourceType.STATUS);
+ }
+ statusResource.setExperimentId(workflowNode.getExperimentId());
+ statusResource.setNodeId(nodeId);
+ statusResource.setStatusType(StatusType.WORKFLOW_NODE.toString());
+ statusResource.setStatusUpdateTime(AiravataUtils.getTime(status.getTimeOfStateChange()));
+ statusResource.setState(status.getWorkflowNodeState().toString());
+ statusResource.save();
+ logger.debugId(nodeId, "Updated workflow node {} status to {}.", nodeId, status.toString());
+ return String.valueOf(statusResource.getStatusId());
+ } catch (Exception e) {
+ logger.errorId(nodeId, "Error while updating workflow node status to " + status.toString() + "...", e);
+ throw new RegistryException(e);
+ }
+ }
+
+ public String addTaskStatus(TaskStatus status, CompositeIdentifier ids) throws RegistryException {
+ try {
+ ExperimentResource experiment = (ExperimentResource) gatewayResource.create(ResourceType.EXPERIMENT);
+ WorkflowNodeDetailResource workflowNode = experiment.getWorkflowNode((String) ids.getTopLevelIdentifier());
+ TaskDetailResource taskDetail = workflowNode.getTaskDetail((String) ids.getSecondLevelIdentifier());
+ StatusResource statusResource = (StatusResource) workflowNode.create(ResourceType.STATUS);
+ statusResource.setExperimentId(workflowNode.getExperimentId());
+ statusResource.setNodeId(workflowNode.getNodeInstanceId());
+ statusResource.setTaskId(taskDetail.getTaskId());
+ statusResource.setStatusType(StatusType.TASK.toString());
+ statusResource.setStatusUpdateTime(AiravataUtils.getTime(status.getTimeOfStateChange()));
+ if (status.getExecutionState() == null) {
+ statusResource.setState(TaskState.UNKNOWN.toString());
+ } else {
+ statusResource.setState(status.getExecutionState().toString());
+ }
+ statusResource.save();
+ return String.valueOf(statusResource.getStatusId());
+ } catch (Exception e) {
+ logger.error("Error while adding task status...", e);
+ throw new RegistryException(e);
+ }
+ }
+
+ public void updateTaskStatus(TaskStatus status, String taskId) throws RegistryException {
+ try {
+ ExperimentResource experiment = (ExperimentResource) gatewayResource.create(ResourceType.EXPERIMENT);
+ WorkflowNodeDetailResource workflowNode = (WorkflowNodeDetailResource) experiment.create(ResourceType.WORKFLOW_NODE_DETAIL);
+ TaskDetailResource taskDetail = workflowNode.getTaskDetail(taskId);
+ StatusResource statusResource;
+ if (taskDetail.isTaskStatusExist(taskId)) {
+ workflowNode = experiment.getWorkflowNode(taskDetail.getNodeId());
+ statusResource = workflowNode.getTaskStatus(taskId);
+ } else {
+ statusResource = (StatusResource) taskDetail.create(ResourceType.STATUS);
+ }
+ statusResource.setExperimentId(workflowNode.getExperimentId());
+ statusResource.setNodeId(workflowNode.getNodeInstanceId());
+ statusResource.setTaskId(taskId);
+ statusResource.setStatusType(StatusType.TASK.toString());
+ statusResource.setStatusUpdateTime(AiravataUtils.getTime(status.getTimeOfStateChange()));
+ statusResource.setState(status.getExecutionState().toString());
+ statusResource.save();
+ logger.infoId(taskId, "Updated task {} status to {}.", taskId, status.toString());
+ } catch (Exception e) {
+ logger.errorId(taskId, "Error while updating task status to " + status.toString() + "...", e);
+ throw new RegistryException(e);
+ }
+ }
+
+ /**
+ * @param status job status
+ * @param ids composite id will contain taskid and jobid
+ * @return status id
+ */
+ public String addJobStatus(JobStatus status, CompositeIdentifier ids) throws RegistryException {
+ try {
+ ExperimentResource experiment = (ExperimentResource) gatewayResource.create(ResourceType.EXPERIMENT);
+ WorkflowNodeDetailResource workflowNode = (WorkflowNodeDetailResource) experiment.create(ResourceType.WORKFLOW_NODE_DETAIL);
+ TaskDetailResource taskDetail = workflowNode.getTaskDetail((String) ids.getTopLevelIdentifier());
+ workflowNode = experiment.getWorkflowNode(taskDetail.getNodeId());
+ JobDetailResource jobDetail = taskDetail.getJobDetail((String) ids.getSecondLevelIdentifier());
+ StatusResource statusResource = (StatusResource) jobDetail.create(ResourceType.STATUS);
+ statusResource.setExperimentId(workflowNode.getExperimentId());
+ statusResource.setNodeId(workflowNode.getNodeInstanceId());
+ statusResource.setTaskId(taskDetail.getTaskId());
+ statusResource.setStatusType(StatusType.JOB.toString());
+ statusResource.setStatusUpdateTime(AiravataUtils.getTime(status.getTimeOfStateChange()));
+ if (status.getJobState() == null) {
+ statusResource.setState(JobState.UNKNOWN.toString());
+ } else {
+ statusResource.setState(status.getJobState().toString());
+ }
+ statusResource.save();
+ return String.valueOf(statusResource.getStatusId());
+ } catch (Exception e) {
+ logger.error("Error while adding job status...", e);
+ throw new RegistryException(e);
+ }
+ }
+
+ public String updateJobStatus(JobStatus status, CompositeIdentifier ids) throws RegistryException {
+ try {
+ ExperimentResource experiment = (ExperimentResource) gatewayResource.create(ResourceType.EXPERIMENT);
+ WorkflowNodeDetailResource workflowNode = (WorkflowNodeDetailResource) experiment.create(ResourceType.WORKFLOW_NODE_DETAIL);
+ TaskDetailResource taskDetail = workflowNode.getTaskDetail((String) ids.getTopLevelIdentifier());
+ JobDetailResource jobDetail = taskDetail.getJobDetail((String) ids.getSecondLevelIdentifier());
+ StatusResource statusResource = jobDetail.getJobStatus();
+ workflowNode = experiment.getWorkflowNode(taskDetail.getNodeId());
+ statusResource.setExperimentId(workflowNode.getExperimentId());
+ statusResource.setNodeId(workflowNode.getNodeInstanceId());
+ statusResource.setTaskId(taskDetail.getTaskId());
+ statusResource.setStatusType(StatusType.JOB.toString());
+ statusResource.setStatusUpdateTime(AiravataUtils.getTime(status.getTimeOfStateChange()));
+ statusResource.setState(status.getJobState().toString());
+ statusResource.save();
+ logger.infoId(ids.toString(), "Updated job status to {}", status.toString());
+ return String.valueOf(statusResource.getStatusId());
+ } catch (Exception e) {
+ logger.errorId(ids.toString(), "Error while updating job status to " + status.toString() + " ...", e);
+ throw new RegistryException(e);
+ }
+ }
+
+ /**
+ * @param status application status
+ * @param ids composite id will contain taskid and jobid
+ * @return status id
+ */
+ public String addApplicationStatus(ApplicationStatus status, CompositeIdentifier ids) throws RegistryException {
+ try {
+ ExperimentResource experiment = (ExperimentResource) gatewayResource.create(ResourceType.EXPERIMENT);
+ WorkflowNodeDetailResource workflowNode = (WorkflowNodeDetailResource) experiment.create(ResourceType.WORKFLOW_NODE_DETAIL);
+ TaskDetailResource taskDetail = workflowNode.getTaskDetail((String) ids.getTopLevelIdentifier());
+ JobDetailResource jobDetail = taskDetail.getJobDetail((String) ids.getSecondLevelIdentifier());
+ StatusResource statusResource = (StatusResource) jobDetail.create(ResourceType.STATUS);
+ workflowNode = experiment.getWorkflowNode(taskDetail.getNodeId());
+ statusResource.setExperimentId(workflowNode.getExperimentId());
+ statusResource.setNodeId(workflowNode.getNodeInstanceId());
+ statusResource.setTaskId(taskDetail.getTaskId());
+ statusResource.setStatusType(StatusType.APPLICATION.toString());
+ statusResource.setStatusUpdateTime(AiravataUtils.getTime(status.getTimeOfStateChange()));
+ if (status.getApplicationState() == null) {
+ statusResource.setState("UNKNOWN");
+ } else {
+ statusResource.setState(status.getApplicationState());
+ }
+ statusResource.save();
+ return String.valueOf(statusResource.getStatusId());
+ } catch (Exception e) {
+ logger.error("Unable to read airavata-server properties", e);
+ throw new RegistryException(e);
+ }
+ }
+
+ public void updateApplicationStatus(ApplicationStatus status, String jobId) throws RegistryException {
+ try {
+ ExperimentResource experiment = (ExperimentResource) gatewayResource.create(ResourceType.EXPERIMENT);
+ WorkflowNodeDetailResource workflowNode = (WorkflowNodeDetailResource) experiment.create(ResourceType.WORKFLOW_NODE_DETAIL);
+ TaskDetailResource taskDetail = (TaskDetailResource) workflowNode.create(ResourceType.TASK_DETAIL);
+ JobDetailResource jobDetail = taskDetail.getJobDetail(jobId);
+ StatusResource statusResource = jobDetail.getApplicationStatus();
+ workflowNode = experiment.getWorkflowNode(taskDetail.getNodeId());
+ statusResource.setExperimentId(workflowNode.getExperimentId());
+ statusResource.setNodeId(workflowNode.getNodeInstanceId());
+ statusResource.setTaskId(taskDetail.getTaskId());
+ statusResource.setStatusType(StatusType.APPLICATION.toString());
+ statusResource.setStatusUpdateTime(AiravataUtils.getTime(status.getTimeOfStateChange()));
+ statusResource.setState(status.getApplicationState());
+ statusResource.save();
+ } catch (Exception e) {
+ logger.error("Error while updating application status...", e);
+ throw new RegistryException(e);
+ }
+ }
+
+
+ /**
+ * @param status data transfer status
+ * @param ids contains taskId and transfer id
+ * @return status id
+ */
+ public String addTransferStatus(TransferStatus status, CompositeIdentifier ids) throws RegistryException {
+ try {
+ ExperimentResource experiment = (ExperimentResource) gatewayResource.create(ResourceType.EXPERIMENT);
+ WorkflowNodeDetailResource workflowNode = (WorkflowNodeDetailResource) experiment.create(ResourceType.WORKFLOW_NODE_DETAIL);
+ TaskDetailResource taskDetail = workflowNode.getTaskDetail((String) ids.getTopLevelIdentifier());
+ DataTransferDetailResource dataTransferDetail = taskDetail.getDataTransferDetail((String) ids.getSecondLevelIdentifier());
+ StatusResource statusResource = (StatusResource) dataTransferDetail.create(ResourceType.STATUS);
+ workflowNode = experiment.getWorkflowNode(taskDetail.getNodeId());
+ statusResource.setExperimentId(workflowNode.getExperimentId());
+ statusResource.setNodeId(workflowNode.getNodeInstanceId());
+ statusResource.setTaskId(taskDetail.getTaskId());
+ statusResource.setTransferId(dataTransferDetail.getTransferId());
+ statusResource.setStatusType(StatusType.DATA_TRANSFER.toString());
+ statusResource.setStatusUpdateTime(AiravataUtils.getTime(status.getTimeOfStateChange()));
+ if (status.getTransferState() == null) {
+ statusResource.setState(TransferState.UNKNOWN.toString());
+ } else {
+ statusResource.setState(status.getTransferState().toString());
+ }
+ statusResource.save();
+ return String.valueOf(statusResource.getStatusId());
+ } catch (Exception e) {
+ logger.error("Error while adding transfer status...", e);
+ throw new RegistryException(e);
+ }
+ }
+
+ public void updateTransferStatus(TransferStatus status, String transferId) throws RegistryException {
+ try {
+ ExperimentResource experiment = (ExperimentResource) gatewayResource.create(ResourceType.EXPERIMENT);
+ WorkflowNodeDetailResource workflowNode = (WorkflowNodeDetailResource) experiment.create(ResourceType.WORKFLOW_NODE_DETAIL);
+ TaskDetailResource taskDetail = (TaskDetailResource) workflowNode.create(ResourceType.TASK_DETAIL);
+ DataTransferDetailResource dataTransferDetail = taskDetail.getDataTransferDetail(transferId);
+ StatusResource statusResource = dataTransferDetail.getDataTransferStatus();
+
+ String taskId = dataTransferDetail.getTaskId();
+ taskDetail = workflowNode.getTaskDetail(taskId);
+ workflowNode = experiment.getWorkflowNode(taskDetail.getNodeId());
+ if (workflowNode != null) {
+ statusResource.setExperimentId(workflowNode.getExperimentId());
+ statusResource.setNodeId(workflowNode.getNodeInstanceId());
+ }
+ statusResource.setTaskId(taskId);
+ statusResource.setTransferId(transferId);
+ statusResource.setStatusType(StatusType.DATA_TRANSFER.toString());
+ statusResource.setStatusUpdateTime(AiravataUtils.getTime(status.getTimeOfStateChange()));
+ statusResource.setState(status.getTransferState().toString());
+ statusResource.save();
+ } catch (Exception e) {
+ logger.error("Error while updating transfer status...", e);
+ throw new RegistryException(e);
+ }
+ }
+
+ public String addWorkflowNodeDetails(WorkflowNodeDetails nodeDetails, String expId) throws RegistryException {
+ try {
+ ExperimentResource experiment = gatewayResource.getExperiment(expId);
+ WorkflowNodeDetailResource resource = (WorkflowNodeDetailResource) experiment.create(ResourceType.WORKFLOW_NODE_DETAIL);
+ resource.setExperimentId(expId);
+ resource.setNodeName(nodeDetails.getNodeName());
+ resource.setExecutionUnit(nodeDetails.getExecutionUnit().toString());
+ resource.setExecutionUnitData(nodeDetails.getExecutionUnitData());
+ resource.setCreationTime(AiravataUtils.getTime(nodeDetails.getCreationTime()));
+ resource.setNodeInstanceId(getNodeInstanceID(nodeDetails.getNodeName()));
+ resource.save();
+ String nodeId = resource.getNodeInstanceId();
+ List<InputDataObjectType> nodeInputs = nodeDetails.getNodeInputs();
+ if (nodeInputs != null) {
+ addWorkflowInputs(nodeDetails.getNodeInputs(), resource);
+ }
+ List<OutputDataObjectType> nodeOutputs = nodeDetails.getNodeOutputs();
+ if (nodeOutputs != null && !nodeOutputs.isEmpty()) {
+ CompositeIdentifier ids = new CompositeIdentifier(expId, nodeId);
+ addNodeOutputs(nodeOutputs, ids);
+ }
+ WorkflowNodeStatus workflowNodeStatus = nodeDetails.getWorkflowNodeStatus();
+ CompositeIdentifier ids = new CompositeIdentifier(expId, nodeId);
+ if (workflowNodeStatus == null) {
+ workflowNodeStatus = new WorkflowNodeStatus();
+ }
+// if (workflowNodeStatus.getWorkflowNodeState() != null){
+// WorkflowNodeStatus status = getWorkflowNodeStatus(nodeId);
+// if (status != null){
+// updateWorkflowNodeStatus(workflowNodeStatus, nodeId);
+// }else {
+// addWorkflowNodeStatus(workflowNodeStatus,ids);
+// }
+// }else {
+// workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.UNKNOWN);
+// addWorkflowNodeStatus(workflowNodeStatus, ids);
+// }
+ workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.UNKNOWN);
+ addWorkflowNodeStatus(workflowNodeStatus, ids);
+ List<TaskDetails> taskDetails = nodeDetails.getTaskDetailsList();
+ if (taskDetails != null && !taskDetails.isEmpty()) {
+ for (TaskDetails task : taskDetails) {
+ addTaskDetails(task, nodeId);
+ }
+ }
+ List<ErrorDetails> errors = nodeDetails.getErrors();
+ if (errors != null && !errors.isEmpty()) {
+ for (ErrorDetails error : errors) {
+ addErrorDetails(error, nodeId);
+ }
+ }
+ return nodeId;
+ } catch (Exception e) {
+ logger.error("Error while adding workflow node details...", e);
+ throw new RegistryException(e);
+ }
+ }
+
+ public void updateWorkflowNodeDetails(WorkflowNodeDetails nodeDetails, String nodeId) throws RegistryException {
+ try {
+ ExperimentResource experiment = (ExperimentResource) gatewayResource.create(ResourceType.EXPERIMENT);
+ WorkflowNodeDetailResource workflowNode = experiment.getWorkflowNode(nodeId);
+ workflowNode.setNodeName(nodeDetails.getNodeName());
+ workflowNode.setExecutionUnit(nodeDetails.getExecutionUnit().toString());
+ workflowNode.setExecutionUnitData(nodeDetails.getExecutionUnitData());
+ workflowNode.setCreationTime(AiravataUtils.getTime(nodeDetails.getCreationTime()));
+ workflowNode.setNodeInstanceId(nodeId);
+ workflowNode.save();
+ String expID = workflowNode.getExperimentId();
+ List<InputDataObjectType> nodeInputs = nodeDetails.getNodeInputs();
+ if (nodeInputs != null) {
+ updateWorkflowInputs(nodeDetails.getNodeInputs(), workflowNode);
+ }
+ List<OutputDataObjectType> nodeOutputs = nodeDetails.getNodeOutputs();
+ if (nodeOutputs != null && !nodeOutputs.isEmpty()) {
+ updateNodeOutputs(nodeOutputs, nodeId);
+ }
+ WorkflowNodeStatus workflowNodeStatus = nodeDetails.getWorkflowNodeStatus();
+ if (workflowNodeStatus != null) {
+ if (isWFNodeExist(nodeId)) {
+ updateWorkflowNodeStatus(workflowNodeStatus, nodeId);
+ } else {
+ CompositeIdentifier ids = new CompositeIdentifier(expID, nodeId);
+ addWorkflowNodeStatus(workflowNodeStatus, ids);
+ }
+ }
+ List<TaskDetails> taskDetails = nodeDetails.getTaskDetailsList();
+ if (taskDetails != null && !taskDetails.isEmpty()) {
+ for (TaskDetails task : taskDetails) {
+ String taskID = task.getTaskID();
+ if (isTaskDetailExist(taskID)) {
+ updateTaskDetails(task, taskID);
+ } else {
+ addTaskDetails(task, nodeId);
+ }
+ }
+ }
+ List<ErrorDetails> errors = nodeDetails.getErrors();
+ if (errors != null && !errors.isEmpty()) {
+ for (ErrorDetails error : errors) {
+ addErrorDetails(error, nodeId);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Error while updating workflow node details...", e);
+ throw new RegistryException(e);
+ }
+ }
+
+
+ public void addWorkflowInputs(List<InputDataObjectType> wfInputs, WorkflowNodeDetailResource nodeDetailResource) throws RegistryException {
+ try {
+ for (InputDataObjectType input : wfInputs) {
+ NodeInputResource resource = (NodeInputResource) nodeDetailResource.create(ResourceType.NODE_INPUT);
+ resource.setNodeId(nodeDetailResource.getNodeInstanceId());
+ resource.setInputKey(input.getName());
+ resource.setValue(input.getValue());
+ if (input.getType() != null) {
+ resource.setDataType(input.getType().toString());
+ }
+ resource.setMetadata(input.getMetaData());
+ resource.setAppArgument(input.getApplicationArgument());
+ resource.setInputOrder(input.getInputOrder());
+ resource.setRequired(input.isIsRequired());
+ resource.setRequiredToCMD(input.isRequiredToAddedToCommandLine());
+ resource.save();
+ }
+ } catch (Exception e) {
+ logger.error("Error while adding workflow inputs...", e);
+ throw new RegistryException(e);
+ }
+
+ }
+
+ public void updateWorkflowInputs(List<InputDataObjectType> wfInputs, WorkflowNodeDetailResource nodeDetailResource) throws RegistryException {
+ try {
+ List<NodeInputResource> nodeInputs = nodeDetailResource.getNodeInputs();
+ for (InputDataObjectType input : wfInputs) {
+ for (NodeInputResource resource : nodeInputs) {
+ resource.setNodeId(nodeDetailResource.getNodeInstanceId());
+ resource.setInputKey(input.getName());
+ resource.setValue(input.getValue());
+ if (input.getType() != null) {
+ resource.setDataType(input.getType().toString());
+ }
+ resource.setMetadata(input.getMetaData());
+ resource.setAppArgument(input.getApplicationArgument());
+ resource.setInputOrder(input.getInputOrder());
+ resource.setRequired(input.isIsRequired());
+ resource.setRequiredToCMD(input.isRequiredToAddedToCommandLine());
+ resource.save();
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Error while updating workflow inputs...", e);
+ throw new RegistryException(e);
+ }
+
+ }
+
+ public String addTaskDetails(TaskDetails taskDetails, String nodeId) throws RegistryException {
+ try {
+ ExperimentResource experiment = (ExperimentResource) gatewayResource.create(ResourceType.EXPERIMENT);
+ WorkflowNodeDetailResource workflowNode = experiment.getWorkflowNode(nodeId);
+ TaskDetailResource taskDetail = (TaskDetailResource) workflowNode.create(ResourceType.TASK_DETAIL);
+ taskDetail.setNodeId(nodeId);
+ taskDetail.setTaskId(getTaskID(workflowNode.getNodeName()));
+ taskDetail.setApplicationId(taskDetails.getApplicationId());
+ taskDetail.setApplicationVersion(taskDetails.getApplicationVersion());
+ taskDetail.setCreationTime(AiravataUtils.getTime(taskDetails.getCreationTime()));
+ taskDetail.setEnableEmailNotifications(taskDetails.isEnableEmailNotification());
+ taskDetail.save();
+
+ List<String> emailAddresses = taskDetails.getEmailAddresses();
+ if (emailAddresses != null && !emailAddresses.isEmpty()){
+ for (String email : emailAddresses){
+ NotificationEmailResource emailResource = new NotificationEmailResource();
+ emailResource.setExperimentId(workflowNode.getExperimentId());
+ emailResource.setTaskId(taskDetail.getTaskId());
+ emailResource.setEmailAddress(email);
+ emailResource.save();
+ }
+ }
+
+ List<InputDataObjectType> applicationInputs = taskDetails.getApplicationInputs();
+ if (applicationInputs != null) {
+ addAppInputs(applicationInputs, taskDetail);
+ }
+ List<OutputDataObjectType> applicationOutput = taskDetails.getApplicationOutputs();
+ if (applicationOutput != null) {
+ addAppOutputs(applicationOutput, taskDetail);
+ }
+ ComputationalResourceScheduling taskScheduling = taskDetails.getTaskScheduling();
+ if (taskScheduling != null) {
+ addComputationScheduling(taskScheduling, taskDetail);
+ }
+ AdvancedInputDataHandling inputDataHandling = taskDetails.getAdvancedInputDataHandling();
+ if (inputDataHandling != null) {
+ addInputDataHandling(inputDataHandling, taskDetail);
+ }
+ AdvancedOutputDataHandling outputDataHandling = taskDetails.getAdvancedOutputDataHandling();
+ if (outputDataHandling != null) {
+ addOutputDataHandling(outputDataHandling, taskDetail);
+ }
+
+ List<JobDetails> jobDetailsList = taskDetails.getJobDetailsList();
+ if (jobDetailsList != null && !jobDetailsList.isEmpty()) {
+ for (JobDetails job : jobDetailsList) {
+ CompositeIdentifier ids = new CompositeIdentifier(taskDetail.getTaskId(), job.getJobID());
+ addJobDetails(job, ids);
+ }
+ }
+
+ List<DataTransferDetails> dataTransferDetailsList = taskDetails.getDataTransferDetailsList();
+ if (dataTransferDetailsList != null && !dataTransferDetailsList.isEmpty()) {
+ for (DataTransferDetails transferDetails : dataTransferDetailsList) {
+ addDataTransferDetails(transferDetails, taskDetail.getTaskId());
+ }
+ }
+
+ List<ErrorDetails> errors = taskDetails.getErrors();
+ if (errors != null && !errors.isEmpty()) {
+ for (ErrorDetails error : errors) {
+ addErrorDetails(error, taskDetail.getTaskId());
+ }
+ }
+
+ TaskStatus taskStatus = taskDetails.getTaskStatus();
+ CompositeIdentifier ids = new CompositeIdentifier(nodeId, taskDetail.getTaskId());
+ if (taskStatus != null) {
+ if (taskStatus.getExecutionState() != null) {
+ addTaskStatus(taskStatus, ids);
+ } else {
+ taskStatus.setExecutionState(TaskState.UNKNOWN);
+ addTaskStatus(taskStatus, ids);
+ }
+ } else {
+ TaskStatus status = new TaskStatus();
+ status.setExecutionState(TaskState.UNKNOWN);
+ addTaskStatus(status, ids);
+ }
+ return taskDetail.getTaskId();
+ } catch (Exception e) {
+ logger.error("Error while adding task details...", e);
+ throw new RegistryException(e);
+ }
+ }
+
+ public String updateTaskDetails(TaskDetails taskDetails, String taskId) throws RegistryException {
+ try {
+ ExperimentResource experiment = (ExperimentResource) gatewayResource.create(ResourceType.EXPERIMENT);
+ WorkflowNodeDetailResource workflowNode = (WorkflowNodeDetailResource) experiment.create(ResourceType.WORKFLOW_NODE_DETAIL);
+ TaskDetailResource taskDetail = workflowNode.getTaskDetail(taskId);
+// taskDetail.setWorkflowNodeDetailResource(workflowNode);
+ taskDetail.setApplicationId(taskDetails.getApplicationId());
+ taskDetail.setApplicationVersion(taskDetails.getApplicationVersion());
+ taskDetail.setCreationTime(AiravataUtils.getTime(taskDetails.getCreationTime()));
+ taskDetail.setApplicationDeploymentId(taskDetails.getApplicationDeploymentId());
+ taskDetail.setEnableEmailNotifications(taskDetails.isEnableEmailNotification());
+ taskDetail.save();
+
+ workflowNode = experiment.getWorkflowNode(taskDetail.getNodeId());
+
+ List<String> emailAddresses = taskDetails.getEmailAddresses();
+ // remove existing emails
+ taskDetail.remove(ResourceType.NOTIFICATION_EMAIL, taskId);
+ if (emailAddresses != null && !emailAddresses.isEmpty()){
+ for (String email : emailAddresses){
+ NotificationEmailResource emailResource = new NotificationEmailResource();
+ emailResource.setExperimentId(workflowNode.getExperimentId());
+ emailResource.setTaskId(taskId);
+ emailResource.setEmailAddress(email);
+ emailResource.save();
+ }
+ }
+ List<InputDataObjectType> applicationInputs = taskDetails.getApplicationInputs();
+ if (applicationInputs != null) {
+ updateAppInputs(applicationInputs, taskDetail);
+ }
+ ComputationalResourceScheduling taskScheduling = taskDetails.getTaskScheduling();
+ if (taskScheduling != null) {
+ updateSchedulingData(taskScheduling, taskDetail);
+ }
+ AdvancedInputDataHandling inputDataHandling = taskDetails.getAdvancedInputDataHandling();
+ if (inputDataHandling != null) {
+ updateInputDataHandling(inputDataHandling, taskDetail);
+ }
+ AdvancedOutputDataHandling outputDataHandling = taskDetails.getAdvancedOutputDataHandling();
+ if (outputDataHandling != null) {
+ updateOutputDataHandling(outputDataHandling, taskDetail);
+ }
+ List<JobDetails> jobDetailsList = taskDetails.getJobDetailsList();
+ if (jobDetailsList != null && !jobDetailsList.isEmpty()) {
+ for (JobDetails job : jobDetailsList) {
+ CompositeIdentifier ids = new CompositeIdentifier(taskId, job.getJobID());
+ updateJobDetails(job, ids);
+ }
+ }
+
+ List<DataTransferDetails> dataTransferDetailsList = taskDetails.getDataTransferDetailsList();
+ if (dataTransferDetailsList != null && !dataTransferDetailsList.isEmpty()) {
+ for (DataTransferDetails transferDetails : dataTransferDetailsList) {
+ updateDataTransferDetails(transferDetails, transferDetails.getTransferID());
+ }
+ }
+
+ List<ErrorDetails> errors = taskDetails.getErrors();
+ if (errors != null && !errors.isEmpty()) {
+ for (ErrorDetails error : errors) {
+ addErrorDetails(error, taskDetail.getTaskId());
+ }
+ }
+
+ TaskStatus taskStatus = taskDetails.getTaskStatus();
+ if (taskStatus != null) {
+ updateTaskStatus(taskStatus, taskId);
+ }
+ return taskDetail.getTaskId();
+ } catch (Exception e) {
+ logger.error("Error while updating task details...", e);
+ throw new RegistryException(e);
+ }
+ }
+
+ public void addAppInputs(List<InputDataObjectType> appInputs, TaskDetailResource taskDetailResource) throws RegistryException {
+ try {
+ for (InputDataObjectType input : appInputs) {
+ ApplicationInputResource resource = (ApplicationInputResource) taskDetailResource.create(ResourceType.APPLICATION_INPUT);
+ resource.setTaskId(taskDetailResource.getTaskId());
+ resource.setInputKey(input.getName());
+ resource.setValue(input.getValue());
+ if (input.getType() != null) {
+ resource.setDataType(input.getType().toString());
+ }
+ resource.setMetadata(input.getMetaData());
+ resource.setAppArgument(input.getApplicationArgument());
+ resource.setInputOrder(input.getInputOrder());
+ resource.setRequired(input.isIsRequired());
+ resource.setRequiredToCMD(input.isRequiredToAddedToCommandLine());
+ resource.save();
+ }
+ } catch (Exception e) {
+ logger.error("Error while adding application inputs...", e);
+ throw new RegistryException(e);
+ }
+
+ }
+
+ public void addAppOutputs(List<OutputDataObjectType> appOytputs, TaskDetailResource taskDetailResource) throws RegistryException {
+ try {
+ for (OutputDataObjectType output : appOytputs) {
+ ApplicationOutputResource resource = (ApplicationOutputResource) taskDetailResource.create(ResourceType.APPLICATION_OUTPUT);
+ resource.setTaskId(taskDetailResource.getTaskId());
+ resource.setOutputKey(output.getName());
+ resource.setValue(output.getValue());
+ if (output.getType() != null) {
+ resource.setDataType(output.getType().toString());
+ }
+ resource.setRequired(output.isIsRequired());
+ resource.setRequiredToCMD(output.isRequiredToAddedToCommandLine());
+ resource.setDataMovement(output.isDataMovement());
+ resource.setDataNameLocation(output.getLocation());
+ resource.setAppArgument(output.getApplicationArgument());
+ resource.setSearchQuery(output.getSearchQuery());
+ resource.save();
+ }
+ } catch (Exception e) {
+ logger.error("Error while adding application outputs...", e);
+ throw new RegistryException(e);
+ }
+
+ }
+
+ public void updateAppOutputs(List<OutputDataObjectType> appOutputs, String taskId) throws RegistryException {
+ try {
+ ExperimentResource experiment = (ExperimentResource) gatewayResource.create(ResourceType.EXPERIMENT);
+ WorkflowNodeDetailResource workflowNode = (WorkflowNodeDetailResource) experiment.create(ResourceType.WORKFLOW_NODE_DETAIL);
+ TaskDetailResource taskDetail = workflowNode.getTaskDetail(taskId);
+ List<ApplicationOutputResource> outputs = taskDetail.getApplicationOutputs();
+ for (OutputDataObjectType output : appOutputs) {
+ for (ApplicationOutputResource resource : outputs) {
+ resource.setTaskId(taskId);
+ resource.setOutputKey(output.getName());
+ resource.setValue(output.getValue());
+ if (output.getType() != null) {
+ resource.setDataType(output.getType().toString());
+ }
+ resource.setRequired(output.isIsRequired());
+ resource.setRequiredToCMD(output.isRequiredToAddedToCommandLine());
+ resource.setDataMovement(output.isDataMovement());
+ resource.setDataNameLocation(output.getLocation());
+ resource.setAppArgument(output.getApplicationArgument());
+ resource.setSearchQuery(output.getSearchQuery());
+// resource.setMetadata(output.getMetaData());
+ resource.save();
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Error while updating application outputs...", e);
+ throw new RegistryException(e);
+ }
+ }
+
+ public void updateAppInputs(List<InputDataObjectType> appInputs, TaskDetailResource taskDetailResource) throws RegistryException {
+ try {
+ List<ApplicationInputResource> inputs = taskDetailResource.getApplicationInputs();
+ for (InputDataObjectType input : appInputs) {
+ for (ApplicationInputResource resource : inputs) {
+ resource.setTaskId(taskDetailResource.getTaskId());
+ resource.setInputKey(input.getName());
+ resource.setValue(input.getValue());
+ if (input.getType() != null) {
+ resource.setDataType(input.getType().toString());
+ }
+ resource.setMetadata(input.getMetaData());
+ resource.setAppArgument(input.getApplicationArgument());
+ resource.setInputOrder(input.getInputOrder());
+ resource.setRequired(input.isIsRequired());
+ resource.setRequiredToCMD(input.isRequiredToAddedToCommandLine());
+ resource.save();
+ }
+
+ }
+ } catch (Exception e) {
+ logger.error("Error while updating application inputs...", e);
+ throw new RegistryException(e);
+ }
+
+ }
+
+ public String addJobDetails(JobDetails jobDetails, CompositeIdentifier ids) throws RegistryException {
+ try {
+ ExperimentResource experiment = (ExperimentResource) gatewayResource.create(ResourceType.EXPERIMENT);
+ WorkflowNodeDetailResource workflowNode = (WorkflowNodeDetailResource) experiment.create(ResourceType.WORKFLOW_NODE_DETAIL);
+ TaskDetailResource taskDetail = workflowNode.getTaskDetail((String) ids.getTopLevelIdentifier());
+ JobDetailResource jobDetail = taskDetail.createJobDetail((String) ids.getSecondLevelIdentifier());
+ jobDetail.setTaskId(taskDetail.getTaskId());
+ jobDetail.setJobDescription(jobDetails.getJobDescription());
+ jobDetail.setCreationTime(AiravataUtils.getTime(jobDetails.getCreationTime()));
+ jobDetail.setComputeResourceConsumed(jobDetails.getComputeResourceConsumed());
+ jobDetail.setWorkingDir(jobDetails.getWorkingDir());
+ jobDetail.setJobName(jobDetails.getJobName());
+ jobDetail.save();
+ JobStatus jobStatus = jobDetails.getJobStatus();
+ if (jobStatus != null) {
+ JobStatus status = getJobStatus(ids);
+ if (status != null) {
+ updateJobStatus(jobStatus, ids);
+ } else {
+ addJobStatus(jobStatus, ids);
+ }
+ }
+ ApplicationStatus applicationStatus = jobDetails.getApplicationStatus();
+ if (applicationStatus != null) {
+ ApplicationStatus appStatus = getApplicationStatus(ids);
+ if (appStatus != null) {
+ updateApplicationStatus(applicationStatus, (String) ids.getSecondLevelIdentifier());
+ } else {
+ addApplicationStatus(applicationStatus, ids);
+ }
+ }
+ List<ErrorDetails> errors = jobDetails.getErrors();
+ if (errors != null && !errors.isEmpty()) {
+ for (ErrorDetails error : errors) {
+ addErrorDetails(error, ids.getSecondLevelIdentifier());
+ }
+ }
+ return jobDetail.getJobId();
+ } catch (Exception e) {
+ logger.error("Error while adding job details...", e);
+ throw new RegistryException(e);
+ }
+ }
+
+ // ids - taskId + jobid
+ public void updateJobDetails(JobDetails jobDetails, CompositeIdentifier ids) throws RegistryException {
+ try {
+ ExperimentResource experiment = (ExperimentResource) gatewayResource.create(ResourceType.EXPERIMENT);
+ WorkflowNodeDetailResource workflowNode = (WorkflowNodeDetailResource) experiment.create(ResourceType.WORKFLOW_NODE_DETAIL);
+ String taskId = (String) ids.getTopLevelIdentifier();
+ TaskDetailResource taskDetail = workflowNode.getTaskDetail(taskId);
+ String jobId = (String) ids.getSecondLevelIdentifier();
+ JobDetailResource jobDetail = taskDetail.getJobDetail(jobId);
+ jobDetail.setTaskId(taskDetail.getTaskId());
+ jobDetail.setJobDescription(jobDetails.getJobDescription());
+ jobDetail.setCreationTime(AiravataUtils.getTime(jobDetails.getCreationTime()));
+ jobDetail.setComputeResourceConsumed(jobDetails.getComputeResourceConsumed());
+ jobDetail.setJobName(jobDetails.getJobName());
+ jobDetail.setWorkingDir(jobDetails.getWorkingDir());
+ jobDetail.save();
+ JobStatus jobStatus = jobDetails.getJobStatus();
+ if (jobStatus != null) {
+ JobStatus status = getJobStatus(ids);
+ if (status != null) {
+ updateJobStatus(jobStatus, ids);
+ } else {
+ addJobStatus(jobStatus, ids);
+ }
+ }
+ ApplicationStatus applicationStatus = jobDetails.getApplicationStatus();
+ if (applicationStatus != null) {
+ ApplicationStatus appStatus = getApplicationStatus(ids);
+ if (appStatus != null) {
+ updateApplicationStatus(applicationStatus, jobId);
+ } else {
+ addApplicationStatus(applicationStatus, ids);
+ }
+ }
+ List<ErrorDetails> errors = jobDetails.getErrors();
+ if (errors != null && !errors.isEmpty()) {
+ for (ErrorDetails error : errors) {
+ addErrorDetails(error, jobId);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Error while updating job details...", e);
+ throw new RegistryException(e);
+ }
+ }
+
+ public String addDataTransferDetails(DataTransferDetails transferDetails, String taskId) throws RegistryException {
+ try {
+ if (transferDetails.getTransferDescription() == null){
+ throw new RegistryException("Data transfer description cannot be empty");
+ }
+ ExperimentResource experiment = (ExperimentResource) gatewayResource.create(ResourceType.EXPERIMENT);
+ WorkflowNodeDetailResource workflowNode = (WorkflowNodeDetailResource) experiment.create(ResourceType.WORKFLOW_NODE_DETAIL);
+ TaskDetailResource taskDetail = workflowNode.getTaskDetail(taskId);
+ DataTransferDetailResource resource = (DataTransferDetailResource) taskDetail.create(ResourceType.DATA_TRANSFER_DETAIL);
+ resource.setTaskId(taskId);
+ resource.setTransferId(getDataTransferID(taskId));
+
+ resource.setTransferDescription(transferDetails.getTransferDescription());
+ resource.setCreationTime(AiravataUtils.getTime(transferDetails.getCreationTime()));
+ resource.save();
+ String transferId = resource.getTransferId();
+ TransferStatus transferStatus = transferDetails.getTransferStatus();
+ if (transferStatus != null) {
+ TransferStatus status = getDataTransferStatus(transferId);
+ if (status != null) {
+ updateTransferStatus(transferStatus, transferId);
+ } else {
+ CompositeIdentifier ids = new CompositeIdentifier(taskId, transferId);
+ addTransferStatus(transferStatus, ids);
+ }
+ }
+ return resource.getTransferId();
+ } catch (Exception e) {
+ logger.error("Error while adding transfer details...", e);
+ throw new RegistryException(e);
+ }
+ }
+
+ public String updateDataTransferDetails(DataTransferDetails transferDetails, String transferId) throws RegistryException {
+ try {
+ ExperimentResource experiment = (ExperimentResource) gatewayResource.create(ResourceType.EXPERIMENT);
+ WorkflowNodeDetailResource workflowNode = (WorkflowNodeDetailResource) experiment.create(ResourceType.WORKFLOW_NODE_DETAIL);
+ TaskDetailResource taskDetail = (TaskDetailResource) workflowNode.create(ResourceType.TASK_DETAIL);
+ DataTransferDetailResource resource = taskDetail.getDataTransferDetail(transferId);
+// resource.setTaskDetailResource(taskDetail);
+ resource.setTransferDescription(transferDetails.getTransferDescription());
+ resource.setCreationTime(AiravataUtils.getTime(transferDetails.getCreationTime()));
+ resource.save();
+ String taskId = resource.getTaskId();
+ TransferStatus transferStatus = transferDetails.getTransferStatus();
+ if (transferStatus != null) {
+ TransferStatus status = getDataTransferStatus(transferId);
+ if (status != null) {
+ updateTransferStatus(transferStatus, transferId);
+ } else {
+ CompositeIdentifier ids = new CompositeIdentifier(taskId, transferId);
+ addTransferStatus(transferStatus, ids);
+ }
+ }
+ return resource.getTransferId();
+ } catch (Exception e) {
+ logger.error("Error while updating transfer details...", e);
+ throw new RegistryException(e);
+ }
+ }
+
+ /**
+ * @param scheduling computational resource object
+ * @param ids contains expId and taskId, if it is an experiment, task id can be null
+ * @return scheduling id
+ */
+ public String addComputationalResourceScheduling(ComputationalResourceScheduling scheduling, CompositeIdentifier ids) throws RegistryException {
+ try {
+ ExperimentResource experiment = gatewayResource.getExperiment((String) ids.getTopLevelIdentifier());
+ ComputationSchedulingResource schedulingResource = (ComputationSchedulingResource) experiment.create(ResourceType.COMPUTATIONAL_RESOURCE_SCHEDULING);
+ if (ids.getSecondLevelIdentifier() != null) {
+ WorkflowNodeDetailResource nodeDetailResource = (WorkflowNodeDetailResource) experiment.create(ResourceType.WORKFLOW_NODE_DETAIL);
+ TaskDetailResource taskDetail = nodeDetailResource.getTaskDetail((String) ids.getSecondLevelIdentifier());
+ schedulingResource.setTaskId(taskDetail.getTaskId());
+ }
+ schedulingResource.setExperimentId(experiment.getExpID());
+ schedulingResource.setResourceHostId(scheduling.getResourceHostId());
+ schedulingResource.setCpuCount(scheduling.getTotalCPUCount());
+ schedulingResource.setNodeCount(scheduling.getNodeCount());
+ schedulingResource.setNumberOfThreads(scheduling.getNumberOfThreads());
+ schedulingResource.setQueueName(scheduling.getQueueName());
+ schedulingResource.setWalltimeLimit(scheduling.getWallTimeLimit());
+ schedulingResource.setJobStartTime(AiravataUtils.getTime(scheduling.getJobStartTime()));
+ schedulingResource.setPhysicalMemory(scheduling.getTotalPhysicalMemory());
+ schedulingResource.setProjectName(scheduling.getComputationalProjectAccount());
+ schedulingResource.save();
+ return String.valueOf(schedulingResource.getSchedulingId());
+ } catch (Exception e) {
+ logger.error("Error while adding scheduling parameters...", e);
+ throw new RegistryException(e);
+ }
+ }
+
+ /**
+ * @param dataHandling advanced input data handling object
+ * @param ids contains expId and taskId
+ * @return data handling id
+ */
+ public String addInputDataHandling(AdvancedInputDataHandling dataHandling, CompositeIdentifier ids) throws RegistryException {
+ try {
+ ExperimentResource experiment = gatewayResource.getExperiment((String) ids.getTopLevelIdentifier());
+ AdvanceInputDataHandlingResource dataHandlingResource = (AdvanceInputDataHandlingResource) experiment.create(ResourceType.ADVANCE_INPUT_DATA_HANDLING);
+ if (ids.getSecondLevelIdentifier() != null) {
+ WorkflowNodeDetailResource nodeDetailResource = (WorkflowNodeDetailResource) experiment.create(ResourceType.WORKFLOW_NODE_DETAIL);
+ TaskDetailResource taskDetail = nodeDetailResource.getTaskDetail((String) ids.getSecondLevelIdentifier());
+ dataHandlingResource.setTaskId(taskDetail.getTaskId());
+ }
+ dataHandlingResource.setExperimentId(experiment.getExpID());
+ dataHandlingResource.setWorkingDir(dataHandling.getUniqueWorkingDirectory());
+ dataHandlingResource.setWorkingDirParent(dataHandling.getParentWorkingDirectory());
+ dataHandlingResource.setStageInputFiles(dataHandling.isStageInputFilesToWorkingDir());
+ dataHandlingResource.setCleanAfterJob(dataHandling.isCleanUpWorkingDirAfterJob());
+ dataHandlingResource.save();
+ return String.valueOf(dataHandlingResource.getDataHandlingId());
+ } catch (Exception e) {
+ logger.error("Error while adding input data handling...", e);
+ throw new RegistryException(e);
+ }
+ }
+
+ /**
+ * @param dataHandling advanced output data handling object
+ * @param ids contains expId and taskId
+ * @return data handling id
+ */
+ public String addOutputDataHandling(AdvancedOutputDataHandling dataHandling, CompositeIdentifier ids) throws RegistryException {
+ try {
+ ExperimentResource experiment = gatewayResource.getExperiment((String) ids.getTopLevelIdentifier());
+ AdvancedOutputDataHandlingResource dataHandlingResource = (AdvancedOutputDataHandlingResource) experiment.create(ResourceType.ADVANCE_OUTPUT_DATA_HANDLING);
+ if (ids.getSecondLevelIdentifier() != null) {
+ WorkflowNodeDetailResource nodeDetailResource = (WorkflowNodeDetailResource) experiment.create(ResourceType.WORKFLOW_NODE_DETAIL);
+ TaskDetailResource taskDetail = nodeDetailResource.getTaskDetail((String) ids.getSecondLevelIdentifier());
+ dataHandlingResource.setTaskId(taskDetail.getTaskId());
+ }
+ dataHandlingResource.setExperimentId(experiment.getExpID());
+ dataHandlingResource.setOutputDataDir(dataHandling.getOutputDataDir());
+ dataHandlingResource.setDataRegUrl(dataHandling.getDataRegistryURL());
+ dataHandlingResource.setPersistOutputData(dataHandling.isPersistOutputData());
+ dataHandlingResource.save();
+ return String.valueOf(dataHandlingResource.getOutputDataHandlingId());
+ } catch (Exception e) {
+ logger.error("Error while adding output data handling...", e);
+ throw new RegistryException(e);
+ }
+ }
+
+ public String addQosParams(QualityOfServiceParams qosParams, CompositeIdentifier ids) throws RegistryException {
+ try {
+ ExperimentResource experiment = gatewayResource.getExperiment((String) ids.getTopLevelIdentifier());
+ QosParamResource qosParamResource = (QosParamResource) experiment.create(ResourceType.QOS_PARAM);
+ if (ids.getSecondLevelIdentifier() != null) {
+ WorkflowNodeDetailResource nodeDetailResource = (WorkflowNodeDetailResource) experiment.create(ResourceType.WORKFLOW_NODE_DETAIL);
+ TaskDetailResource taskDetail = nodeDetailResource.getTaskDetail((String) ids.getSecondLevelIdentifier());
+ qosParamResource.setTaskId(taskDetail.getTaskId());
+ }
+ qosParamResource.setExperimentId(experiment.getExpID());
+ qosParamResource.setStartExecutionAt(qosParams.getStartExecutionAt());
+ qosParamResource.setExecuteBefore(qosParams.getExecuteBefore());
+ qosParamResource.setNoOfRetries(qosParams.getNumberofRetries());
+ qosParamResource.save();
+ return String.valueOf(qosParamResource.getQosId());
+ } catch (Exception e) {
+ logger.error("Error while adding QOS params...", e);
+ throw new RegistryException(e);
+ }
+ }
+
+ public String addErrorDetails(ErrorDetails error, Object id) throws RegistryException {
+ try {
+
+ ErrorDetailResource errorResource = null;
+ ExperimentResource experiment;
+ TaskDetailResource taskDetail;
+ WorkflowNodeDetailResource workflowNode;
+ // figure out the id is an experiment, node task or job
+ if (id instanceof String) {
+ // FIXME : for .12 we only save task related errors
+// if (isExperimentExist((String) id)) {
+// experiment = gatewayResource.getExperiment((String) id);
+// errorResource = (ErrorDetailResource) experiment.create(ResourceType.ERROR_DETAIL);
+// } else if (isWFNodeExist((String) id)) {
+// experiment = (ExperimentResource) gatewayResource.create(ResourceType.EXPERIMENT);
+// workflowNode = experiment.getWorkflowNode((String) id);
+// errorResource = (ErrorDetailResource) workflowNode.create(ResourceType.ERROR_DETAIL);
+// errorResource.setExperimentResource(workflowNode.getExperimentResource());
+// } else
+ if (isTaskDetailExist((String) id)) {
+ experiment = (ExperimentResource) gatewayResource.create(ResourceType.EXPERIMENT);
+ workflowNode = (WorkflowNodeDetailResource) experiment.create(ResourceType.WORKFLOW_NODE_DETAIL);
+ taskDetail = workflowNode.getTaskDetail((String) id);
+ errorResource = (ErrorDetailResource) taskDetail.create(ResourceType.ERROR_DETAIL);
+ if (error.getErrorID() != null && !error.getErrorID().equals(experimentModelConstants.DEFAULT_ID)) {
+ List<ErrorDetailResource> errorDetailList = taskDetail.getErrorDetailList();
+ if (errorDetailList != null && !errorDetailList.isEmpty()) {
+ for (ErrorDetailResource errorDetailResource : errorDetailList) {
+ if (errorDetailResource.getErrorId() == Integer.parseInt(error.getErrorID())) {
+ errorResource = errorDetailResource;
+ }
+ }
+ }
+ }
+ errorResource.setTaskId(taskDetail.getTaskId());
+ errorResource.setNodeId(taskDetail.getNodeId());
+ workflowNode = experiment.getWorkflowNode(taskDetail.getNodeId());
+ errorResource.setExperimentId(workflowNode.getExperimentId());
+ } else {
+// logger.error("The id provided is not an experiment id or a workflow id or a task id..");
+ }
+ } else if (id instanceof CompositeIdentifier) {
+ CompositeIdentifier cid = (CompositeIdentifier) id;
+ if (isJobDetailExist(cid)) {
+ experiment = (ExperimentResource) gatewayResource.create(ResourceType.EXPERIMENT);
+ workflowNode = (WorkflowNodeDetailResource) experiment.create(ResourceType.WORKFLOW_NODE_DETAIL);
+ taskDetail = workflowNode.getTaskDetail((String) cid.getTopLevelIdentifier());
+ JobDetailResource jobDetail = taskDetail.getJobDetail((String) cid.getSecondLevelIdentifier());
+ errorResource = (ErrorDetailResource) jobDetail.create(ResourceType.ERROR_DETAIL);
+ if (error.getErrorID() != null && !error.getErrorID().equals(experimentModelConstants.DEFAULT_ID)) {
+ List<ErrorDetailResource> errorDetailList = taskDetail.getErrorDetailList();
+ if (errorDetailList != null && !errorDetailList.isEmpty()) {
+ for (ErrorDetailResource errorDetailResource : errorDetailList) {
+ if (errorDetailResource.getErrorId() == Integer.parseInt(error.getErrorID())) {
+ errorResource = errorDetailResource;
+ }
+ }
+ }
+ }
+ errorResource.setTaskId(taskDetail.getTaskId());
+ errorResource.setNodeId(taskDetail.getNodeId());
+ workflowNode = experiment.getWorkflowNode(taskDetail.getNodeId());
+ errorResource.setExperimentId(workflowNode.getExperimentId());
+ } else {
+ logger.error("The id provided is not a job in the system..");
+ }
+ } else {
+// logger.error("The id provided is not an experiment id or a workflow id or a task id or a composite " +
+// "identifier for job..");
+ }
+ if (errorResource != null) {
+ errorResource.setCreationTime(AiravataUtils.getTime(error.getCreationTime()));
+ errorResource.setActualErrorMsg(error.getActualErrorMessage());
+ errorResource.setUserFriendlyErrorMsg(error.getUserFriendlyMessage());
+ if (error.getErrorCategory() != null) {
+ errorResource.setErrorCategory(error.getErrorCategory().toString());
+ }
+ errorResource.setTransientPersistent(error.isTransientOrPersistent());
+ if (error.getCorrectiveAction() != null) {
+ errorResource.setCorrectiveAction(error.getCorrectiveAction().toString());
+ } else {
+ errorResource.setCorrectiveAction(CorrectiveAction.CONTACT_SUPPORT.toString());
+ }
+ if (error.getActionableGroup() != null) {
+ errorResource.setActionableGroup(error.getActionableGroup().toString());
+ } else {
+ errorResource.setActionableGroup(ActionableGroup.GATEWAYS_ADMINS.toString());
+ }
+ errorResource.save();
+ return String.valueOf(errorResource.getErrorId());
+ }
+ } catch (Exception e) {
+ logger.error("Unable to add error details...", e);
+ throw new RegistryException(e);
+ }
+ return null;
+ }
+
+ public String getNodeInstanceID(String nodeName) {
+ String node = nodeName.replaceAll("\\s", "");
+ return node + "_" + UUID.randomUUID();
+ }
+
+ public String getExperimentID(String experimentName) {
+ String exp = experimentName.replaceAll("\\s", "");
+ return exp + "_" + UUID.randomUUID();
+ }
+
+ public String getTaskID(String nodeName) {
<TRUNCATED>