You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2019/06/13 20:25:36 UTC

[airavata] branch develop updated: Implementing process workflow related methods in RegistryServerHandler

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/develop by this push:
     new e1de154  Implementing process workflow related methods in RegistryServerHandler
e1de154 is described below

commit e1de154549afa9baf57294c001c2c8bc53bdde4f
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Thu Jun 13 16:25:27 2019 -0400

    Implementing process workflow related methods in RegistryServerHandler
---
 .../src/main/resources/META-INF/persistence.xml    |   1 +
 .../repositories/expcatalog/JobRepository.java     |   7 ++
 .../expcatalog/ProcessWorkflowRepository.java      |  52 ++++++++++
 .../registry/core/utils/QueryConstants.java        |   2 +
 .../api/service/handler/RegistryServerHandler.java | 113 ++++++++-------------
 5 files changed, 107 insertions(+), 68 deletions(-)

diff --git a/modules/ide-integration/src/main/resources/META-INF/persistence.xml b/modules/ide-integration/src/main/resources/META-INF/persistence.xml
index 37df5ac..7ace098 100644
--- a/modules/ide-integration/src/main/resources/META-INF/persistence.xml
+++ b/modules/ide-integration/src/main/resources/META-INF/persistence.xml
@@ -123,6 +123,7 @@
         <class>org.apache.airavata.registry.core.entities.expcatalog.ProcessOutputEntity</class>
         <class>org.apache.airavata.registry.core.entities.expcatalog.ProcessResourceScheduleEntity</class>
         <class>org.apache.airavata.registry.core.entities.expcatalog.ProcessStatusEntity</class>
+        <class>org.apache.airavata.registry.core.entities.expcatalog.ProcessWorkflowEntity</class>
         <class>org.apache.airavata.registry.core.entities.expcatalog.ProjectEntity</class>
         <class>org.apache.airavata.registry.core.entities.expcatalog.ProjectUserEntity</class>
         <class>org.apache.airavata.registry.core.entities.expcatalog.QueueStatusEntity</class>
diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/repositories/expcatalog/JobRepository.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/repositories/expcatalog/JobRepository.java
index 3744913..cb37438 100644
--- a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/repositories/expcatalog/JobRepository.java
+++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/repositories/expcatalog/JobRepository.java
@@ -124,6 +124,13 @@ public class JobRepository extends ExpCatAbstractRepository<JobModel, JobEntity,
             jobModelList = jobRepository.select(QueryConstants.GET_JOB_FOR_TASK_ID, -1, 0, queryParameters);
         }
 
+        else if (fieldName.equals(DBConstants.Job.JOB_ID)) {
+            logger.debug("Search criteria is JobId");
+            Map<String, Object> queryParameters = new HashMap<>();
+            queryParameters.put(DBConstants.Job.JOB_ID, value);
+            jobModelList = jobRepository.select(QueryConstants.GET_JOB_FOR_JOB_ID, -1, 0, queryParameters);
+        }
+
         else {
             logger.error("Unsupported field name for Job module.");
             throw new IllegalArgumentException("Unsupported field name for Job module.");
diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/repositories/expcatalog/ProcessWorkflowRepository.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/repositories/expcatalog/ProcessWorkflowRepository.java
new file mode 100644
index 0000000..14a4efc
--- /dev/null
+++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/repositories/expcatalog/ProcessWorkflowRepository.java
@@ -0,0 +1,52 @@
+package org.apache.airavata.registry.core.repositories.expcatalog;
+
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.process.ProcessWorkflow;
+import org.apache.airavata.registry.core.entities.expcatalog.ProcessWorkflowEntity;
+import org.apache.airavata.registry.core.entities.expcatalog.ProcessWorkflowPK;
+import org.apache.airavata.registry.core.utils.ObjectMapperSingleton;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.dozer.Mapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ProcessWorkflowRepository extends ExpCatAbstractRepository<ProcessWorkflow, ProcessWorkflowEntity, ProcessWorkflowPK> {
+
+    private final static Logger logger = LoggerFactory.getLogger(ProcessInputRepository.class);
+
+    public ProcessWorkflowRepository() {
+        super(ProcessWorkflow.class, ProcessWorkflowEntity.class);
+    }
+
+    protected void saveProcessWorkflow(List<ProcessWorkflow> processWorkflows, String processId) throws RegistryException {
+
+        for (ProcessWorkflow processWorkflow : processWorkflows) {
+            Mapper mapper = ObjectMapperSingleton.getInstance();
+            ProcessWorkflowEntity processWorkflowEntity = mapper.map(processWorkflow, ProcessWorkflowEntity.class);
+
+            if (processWorkflowEntity.getProcessId() == null) {
+                logger.debug("Setting the ProcessWorkflowEntity's ProcessId");
+                processWorkflowEntity.setProcessId(processId);
+            }
+            execute(entityManager -> entityManager.merge(processWorkflowEntity));
+        }
+    }
+
+    public String addProcessWorkflow(ProcessWorkflow processWorkflow, String processId) throws RegistryException {
+        saveProcessWorkflow(Collections.singletonList(processWorkflow), processId);
+        return processId;
+    }
+
+    public void addProcessWorkflows(List<ProcessWorkflow> processWorkflows, String processId) throws RegistryException {
+        saveProcessWorkflow(processWorkflows, processId);
+    }
+
+    public List<ProcessWorkflow> getProcessWorkflows(String processId) throws RegistryException {
+        ProcessRepository processRepository = new ProcessRepository();
+        ProcessModel processModel = processRepository.getProcess(processId);
+        return processModel.getProcessWorkflows();
+    }
+}
diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/utils/QueryConstants.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/utils/QueryConstants.java
index 7fa29cd..a2c1737 100644
--- a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/utils/QueryConstants.java
+++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/utils/QueryConstants.java
@@ -140,6 +140,8 @@ public interface QueryConstants {
             "WHERE J.processId LIKE :" + DBConstants.Job.PROCESS_ID;
     String GET_JOB_FOR_TASK_ID = "SELECT J FROM " + JobEntity.class.getSimpleName() + " J " +
             "WHERE J.taskId LIKE :" + DBConstants.Job.TASK_ID;
+    String GET_JOB_FOR_JOB_ID = "SELECT J FROM " + JobEntity.class.getSimpleName() + " J " +
+            "WHERE J.jobId LIKE :" + DBConstants.Job.JOB_ID;
 
     String GET_ALL_QUEUE_STATUS_MODELS = "SELECT QSM FROM " + QueueStatusEntity.class.getSimpleName() + " QSM";
 
diff --git a/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/handler/RegistryServerHandler.java b/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/handler/RegistryServerHandler.java
index 1966bf5..8f66d1c 100644
--- a/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/handler/RegistryServerHandler.java
+++ b/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/handler/RegistryServerHandler.java
@@ -96,25 +96,7 @@ import org.apache.airavata.registry.core.repositories.appcatalog.ParserRepositor
 import org.apache.airavata.registry.core.repositories.appcatalog.ParsingTemplateRepository;
 import org.apache.airavata.registry.core.repositories.appcatalog.StorageResourceRepository;
 import org.apache.airavata.registry.core.repositories.appcatalog.UserResourceProfileRepository;
-import org.apache.airavata.registry.core.repositories.expcatalog.ExperimentErrorRepository;
-import org.apache.airavata.registry.core.repositories.expcatalog.ExperimentOutputRepository;
-import org.apache.airavata.registry.core.repositories.expcatalog.ExperimentRepository;
-import org.apache.airavata.registry.core.repositories.expcatalog.ExperimentStatusRepository;
-import org.apache.airavata.registry.core.repositories.expcatalog.ExperimentSummaryRepository;
-import org.apache.airavata.registry.core.repositories.expcatalog.GatewayRepository;
-import org.apache.airavata.registry.core.repositories.expcatalog.JobRepository;
-import org.apache.airavata.registry.core.repositories.expcatalog.JobStatusRepository;
-import org.apache.airavata.registry.core.repositories.expcatalog.NotificationRepository;
-import org.apache.airavata.registry.core.repositories.expcatalog.ProcessErrorRepository;
-import org.apache.airavata.registry.core.repositories.expcatalog.ProcessOutputRepository;
-import org.apache.airavata.registry.core.repositories.expcatalog.ProcessRepository;
-import org.apache.airavata.registry.core.repositories.expcatalog.ProcessStatusRepository;
-import org.apache.airavata.registry.core.repositories.expcatalog.ProjectRepository;
-import org.apache.airavata.registry.core.repositories.expcatalog.QueueStatusRepository;
-import org.apache.airavata.registry.core.repositories.expcatalog.TaskErrorRepository;
-import org.apache.airavata.registry.core.repositories.expcatalog.TaskRepository;
-import org.apache.airavata.registry.core.repositories.expcatalog.TaskStatusRepository;
-import org.apache.airavata.registry.core.repositories.expcatalog.UserRepository;
+import org.apache.airavata.registry.core.repositories.expcatalog.*;
 import org.apache.airavata.registry.core.repositories.replicacatalog.DataProductRepository;
 import org.apache.airavata.registry.core.repositories.replicacatalog.DataReplicaLocationRepository;
 import org.apache.airavata.registry.core.repositories.workflowcatalog.WorkflowRepository;
@@ -136,7 +118,6 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 public class RegistryServerHandler implements RegistryService.Iface {
     private final static Logger logger = LoggerFactory.getLogger(RegistryServerHandler.class);
@@ -155,6 +136,7 @@ public class RegistryServerHandler implements RegistryService.Iface {
     private ExperimentErrorRepository experimentErrorRepository = new ExperimentErrorRepository();
     private ProcessRepository processRepository = new ProcessRepository();
     private ProcessOutputRepository processOutputRepository = new ProcessOutputRepository();
+    private ProcessWorkflowRepository processWorkflowRepository = new ProcessWorkflowRepository();
     private ProcessStatusRepository processStatusRepository = new ProcessStatusRepository();
     private ProcessErrorRepository processErrorRepository = new ProcessErrorRepository();
     private TaskRepository taskRepository = new TaskRepository();
@@ -1067,17 +1049,15 @@ public class RegistryServerHandler implements RegistryService.Iface {
     @Override
     public List<JobModel> getJobs(String queryType, String id) throws RegistryServiceException, TException {
 
-        // TODO: reimplement
-        // try {
-        //     return fetchJobModels(queryType, id);
-        // } catch (Exception e) {
-        //     logger.error(id, "Error while retrieving jobs for query " + queryType + " and id " + id, e);
-        //     AiravataSystemException exception = new AiravataSystemException();
-        //     exception.setAiravataErrorType(AiravataErrorType.INTERNAL_ERROR);
-        //     exception.setMessage("Error while retrieving jobs for query " + queryType + " and id " + id + ". More info : " + e.getMessage());
-        //     throw exception;
-        // }
-        return Collections.emptyList();
+        try {
+            return fetchJobModels(queryType, id);
+        } catch (Exception e) {
+            logger.error(id, "Error while retrieving jobs for query " + queryType + " and id " + id, e);
+            AiravataSystemException exception = new AiravataSystemException();
+            exception.setAiravataErrorType(AiravataErrorType.INTERNAL_ERROR);
+            exception.setMessage("Error while retrieving jobs for query " + queryType + " and id " + id + ". More info : " + e.getMessage());
+            throw exception;
+        }
     }
 
     private JobModel fetchJobModel(String queryType, String id) throws RegistryException {
@@ -1104,26 +1084,28 @@ public class RegistryServerHandler implements RegistryService.Iface {
         return null;
     }
 
-    // TODO: reimplement
-    // private List<JobModel> fetchJobModels(String queryType, String id) throws RegistryException {
-    //     experimentCatalog = RegistryFactory.getDefaultExpCatalog();
-    //     List<Object> jobs = new ArrayList<>();
-    //     if (queryType.equals(Constants.FieldConstants.JobConstants.TASK_ID)) {
-    //         jobs = experimentCatalog.get(ExperimentCatalogModelType.JOB, Constants.FieldConstants.JobConstants.TASK_ID, id);
-    //     } else if (queryType.equals(Constants.FieldConstants.JobConstants.PROCESS_ID)) {
-    //         jobs = experimentCatalog.get(ExperimentCatalogModelType.JOB, Constants.FieldConstants.JobConstants.PROCESS_ID, id);
-    //     } else if (queryType.equals(Constants.FieldConstants.JobConstants.JOB_ID)) {
-    //         jobs = experimentCatalog.get(ExperimentCatalogModelType.JOB, Constants.FieldConstants.JobConstants.JOB_ID, id);
-    //     }
-    //     return jobs.stream().map(obj -> (JobModel)obj).collect(Collectors.toList());
-    // }
+    private List<JobModel> fetchJobModels(String queryType, String id) throws RegistryException {
+        List<JobModel> jobs = new ArrayList<>();
+        switch (queryType) {
+            case Constants.FieldConstants.JobConstants.TASK_ID:
+                jobs = jobRepository.getJobList(Constants.FieldConstants.JobConstants.TASK_ID, id);
+                break;
+            case Constants.FieldConstants.JobConstants.PROCESS_ID:
+                jobs = jobRepository.getJobList(Constants.FieldConstants.JobConstants.PROCESS_ID, id);
+                break;
+            case Constants.FieldConstants.JobConstants.JOB_ID:
+                jobs = jobRepository.getJobList(Constants.FieldConstants.JobConstants.JOB_ID, id);
+                break;
+        }
+        return jobs;
+    }
 
     @Override
     public List<OutputDataObjectType> getProcessOutputs(String processId) throws RegistryServiceException, TException {
         try {
             return processOutputRepository.getProcessOutputs(processId);
         } catch (Exception e) {
-            logger.error(processId, "Error while retrieving process outputs", e);
+            logger.error("Error while retrieving process outputs for process id " + processId, e);
             AiravataSystemException exception = new AiravataSystemException();
             exception.setAiravataErrorType(AiravataErrorType.INTERNAL_ERROR);
             exception.setMessage("Error while retrieving process outputs. More info : " + e.getMessage());
@@ -1134,33 +1116,28 @@ public class RegistryServerHandler implements RegistryService.Iface {
     @Override
     public List<ProcessWorkflow> getProcessWorkflows(String processId) throws RegistryServiceException, TException {
 
-        // try {
-        //     experimentCatalog = RegistryFactory.getDefaultExpCatalog();
-        //     return (List<ProcessWorkflow>) experimentCatalog.get(ExperimentCatalogModelType.PROCESS_WORKFLOW, processId);
-        // } catch (Exception e) {
-        //     logger.error(processId, "Error while retrieving process workflows for process " + processId, e);
-        //     AiravataSystemException exception = new AiravataSystemException();
-        //     exception.setAiravataErrorType(AiravataErrorType.INTERNAL_ERROR);
-        //     exception.setMessage("Error while retrieving process workflows for process " + processId
-        //             + ". More info : " + e.getMessage());
-        //     throw exception;
-        // }
-        return Collections.emptyList();
+        try {
+            return processWorkflowRepository.getProcessWorkflows(processId);
+        } catch (Exception e) {
+            logger.error("Error while retrieving process workflows for process id " + processId, e);
+            AiravataSystemException exception = new AiravataSystemException();
+            exception.setAiravataErrorType(AiravataErrorType.INTERNAL_ERROR);
+            exception.setMessage("Error while retrieving process workflows for process id "+ processId + ". More info : " + e.getMessage());
+            throw exception;
+        }
     }
 
     @Override
     public void addProcessWorkflow(ProcessWorkflow processWorkflow) throws RegistryServiceException, TException {
-        // try {
-        //     experimentCatalog = RegistryFactory.getDefaultExpCatalog();
-        //     experimentCatalog.add(ExpCatChildDataType.PROCESS_WORKFLOW, processWorkflow, processWorkflow.getProcessId());
-        // } catch (Exception e) {
-        //     logger.error("Error while adding process workflow for process " + processWorkflow.getProcessId(), e);
-        //     AiravataSystemException exception = new AiravataSystemException();
-        //     exception.setAiravataErrorType(AiravataErrorType.INTERNAL_ERROR);
-        //     exception.setMessage("Error while adding process workflow for process id " + processWorkflow.getProcessId()
-        //             + ". More info : " + e.getMessage());
-        //     throw exception;
-        // }
+        try {
+            processWorkflowRepository.addProcessWorkflow(processWorkflow, processWorkflow.getProcessId());
+        } catch (Exception e) {
+            logger.error("Error while adding process workflows for process id " + processWorkflow.getProcessId(), e);
+            AiravataSystemException exception = new AiravataSystemException();
+            exception.setAiravataErrorType(AiravataErrorType.INTERNAL_ERROR);
+            exception.setMessage("Error while adding process workflows for process id "+ processWorkflow.getProcessId() + ". More info : " + e.getMessage());
+            throw exception;
+        }
     }
 
     @Override