You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ma...@apache.org on 2019/04/03 19:07:06 UTC

[airavata] branch develop updated: AIRAVATA-3002 Use db event to sync default project to sharing service

This is an automated email from the ASF dual-hosted git repository.

machristie pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/develop by this push:
     new 770695d  AIRAVATA-3002 Use db event to sync default project to sharing service
770695d is described below

commit 770695d7282b0c788316d0705e86b36e635652c5
Author: Marcus Christie <ma...@apache.org>
AuthorDate: Wed Apr 3 15:03:33 2019 -0400

    AIRAVATA-3002 Use db event to sync default project to sharing service
---
 .../messaging/core/util/DBEventPublisherUtils.java | 96 ++++++++++++++++++++++
 .../messaging/RegistryServiceDBEventHandler.java   | 32 +++-----
 .../messaging/SharingServiceDBEventHandler.java    | 48 +++++++++++
 .../airavata/sharing/registry/utils/Constants.java |  7 +-
 4 files changed, 159 insertions(+), 24 deletions(-)

diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/util/DBEventPublisherUtils.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/util/DBEventPublisherUtils.java
new file mode 100644
index 0000000..61b861c
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/util/DBEventPublisherUtils.java
@@ -0,0 +1,96 @@
+package org.apache.airavata.messaging.core.util;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.DBEventManagerConstants;
+import org.apache.airavata.common.utils.DBEventService;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.dbevent.CrudType;
+import org.apache.airavata.model.dbevent.DBEventMessage;
+import org.apache.airavata.model.dbevent.DBEventMessageContext;
+import org.apache.airavata.model.dbevent.DBEventPublisher;
+import org.apache.airavata.model.dbevent.DBEventPublisherContext;
+import org.apache.airavata.model.dbevent.DBEventType;
+import org.apache.airavata.model.dbevent.EntityType;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.thrift.TBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DBEventPublisherUtils
+ */
+public class DBEventPublisherUtils {
+
+    private final static Logger logger = LoggerFactory.getLogger(DBEventPublisherUtils.class);
+    private static Publisher dbEventPublisher = null;
+
+    /**
+     * Publish DB Event for given entity.
+     * @param entityType
+     * @param crudType
+     * @param entityModel
+     */
+    public static void publish(EntityType entityType, CrudType crudType, TBase entityModel) throws AiravataException {
+
+        DBEventPublisherUtils.getDbEventPublisher().publish(
+                DBEventPublisherUtils.getDBEventMessageContext(entityType, crudType, entityModel),
+                DBEventManagerConstants.getRoutingKey(DBEventService.DB_EVENT.toString()));
+    }
+
+    /**
+     * Returns singleton instance of dbEventPublisher
+     * @return
+     * @throws AiravataException
+     */
+    private static Publisher getDbEventPublisher() throws AiravataException {
+        if(null == dbEventPublisher){
+            synchronized (DBEventPublisherUtils.class){
+                if(null == dbEventPublisher){
+                    logger.info("Creating DB Event publisher.....");
+                    dbEventPublisher = MessagingFactory.getDBEventPublisher();
+                    logger.info("DB Event publisher created");
+                }
+            }
+        }
+        return dbEventPublisher;
+    }
+
+    /**
+     * Constructs the dbEventMessageContext
+     * @param entityType
+     * @param crudType
+     * @param entityModel
+     * @return
+     * @throws AiravataException
+     */
+    private static MessageContext getDBEventMessageContext(EntityType entityType, CrudType crudType, TBase entityModel) throws AiravataException {
+        try {
+            // set the publisherContext
+            DBEventMessage dbEventMessage = new DBEventMessage();
+            DBEventPublisherContext publisherContext = new DBEventPublisherContext();
+            publisherContext.setCrudType(crudType);
+            publisherContext.setEntityDataModel(ThriftUtils.serializeThriftObject(entityModel));
+            publisherContext.setEntityType(entityType);
+
+            // create dbEventPublisher with publisherContext
+            DBEventPublisher dbEventPublisher = new DBEventPublisher();
+            dbEventPublisher.setPublisherContext(publisherContext);
+
+            // set messageContext to dbEventPublisher
+            DBEventMessageContext dbMessageContext = DBEventMessageContext.publisher(dbEventPublisher);
+
+            // set dbEventMessage with messageContext
+            dbEventMessage.setDbEventType(DBEventType.PUBLISHER);
+            dbEventMessage.setPublisherService(DBEventManagerConstants.getDbEventServiceName(entityType));
+            dbEventMessage.setMessageContext(dbMessageContext);
+
+            // construct and return messageContext
+            return new MessageContext(dbEventMessage, MessageType.DB_EVENT, "", "");
+        } catch (Exception ex) {
+            throw new AiravataException(ex.getMessage(), ex);
+        }
+    }
+}
diff --git a/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/messaging/RegistryServiceDBEventHandler.java b/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/messaging/RegistryServiceDBEventHandler.java
index c93835e..3bbfe25 100644
--- a/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/messaging/RegistryServiceDBEventHandler.java
+++ b/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/messaging/RegistryServiceDBEventHandler.java
@@ -22,23 +22,24 @@ package org.apache.airavata.registry.api.service.messaging;
 
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.DBEventManagerConstants;
+import org.apache.airavata.common.utils.DBEventService;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftClientPool;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.util.DBEventPublisherUtils;
+import org.apache.airavata.model.dbevent.CrudType;
 import org.apache.airavata.model.dbevent.DBEventMessage;
 import org.apache.airavata.model.dbevent.DBEventPublisherContext;
+import org.apache.airavata.model.dbevent.EntityType;
 import org.apache.airavata.model.error.DuplicateEntryException;
-import org.apache.airavata.model.group.ResourceType;
 import org.apache.airavata.model.user.UserProfile;
 import org.apache.airavata.model.workspace.Gateway;
 import org.apache.airavata.model.workspace.Project;
 import org.apache.airavata.registry.api.RegistryService;
 import org.apache.airavata.registry.api.exception.RegistryServiceException;
-import org.apache.airavata.registry.api.service.util.Constants;
-import org.apache.airavata.sharing.registry.models.Entity;
-import org.apache.airavata.sharing.registry.service.cpi.SharingRegistryService;
 import org.apache.commons.pool.impl.GenericObjectPool;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -53,7 +54,6 @@ public class RegistryServiceDBEventHandler implements MessageHandler {
 
     private static final Logger logger = LoggerFactory.getLogger(RegistryServiceDBEventHandler.class);
     private final ThriftClientPool<RegistryService.Client> registryClientPool;
-    private final ThriftClientPool<SharingRegistryService.Client> sharingClientPool;
 
     public RegistryServiceDBEventHandler() throws ApplicationSettingsException, RegistryServiceException {
 
@@ -66,9 +66,6 @@ public class RegistryServiceDBEventHandler implements MessageHandler {
         poolConfig.numTestsPerEvictionRun = 10;
         poolConfig.maxWait = 3000;
 
-        sharingClientPool = new ThriftClientPool<>(
-                tProtocol -> new SharingRegistryService.Client(tProtocol), poolConfig, ServerSettings.getSharingRegistryHost(),
-                Integer.parseInt(ServerSettings.getSharingRegistryPort()));
         registryClientPool = new ThriftClientPool<>(
                 tProtocol -> new RegistryService.Client(tProtocol), poolConfig, ServerSettings.getRegistryServerHost(),
                 Integer.parseInt(ServerSettings.getRegistryServerPort()));
@@ -90,7 +87,6 @@ public class RegistryServiceDBEventHandler implements MessageHandler {
             logger.info("RegistryService, Replicated Entity: " + publisherContext.getEntityType());
 
             RegistryService.Client registryClient = registryClientPool.getResource();
-            SharingRegistryService.Client sharingClient = sharingClientPool.getResource();
             // this try-block is mainly for catching DuplicateEntryException
             try {
                 // check type of entity-type
@@ -145,17 +141,8 @@ public class RegistryServiceDBEventHandler implements MessageHandler {
                                 Project defaultProject = createDefaultProject(registryClient, userProfile);
                                 if (defaultProject != null) {
 
-                                    // TODO: user may not yet exist in the sharing registry, should we check and try to create it?
-                                    Entity entity = new Entity();
-                                    entity.setEntityId(defaultProject.getProjectID());
-                                    final String domainId = defaultProject.getGatewayId();
-                                    entity.setDomainId(domainId);
-                                    entity.setEntityTypeId(domainId + ":" + ResourceType.PROJECT.name());
-                                    entity.setOwnerId(defaultProject.getOwner() + "@" + domainId);
-                                    entity.setName(defaultProject.getName());
-                                    entity.setDescription(defaultProject.getDescription());
-                                    sharingClient.createEntity(entity);
-                                    logger.info("Default project for {} added to sharing registry", userProfile.getUserId());
+                                    // Publish new PROJECT event (sharing service will listen for it and register this as a shared Entity)
+                                    DBEventPublisherUtils.publish(EntityType.PROJECT, CrudType.CREATE, defaultProject);
                                 }
                                 logger.info("addUser Replication Success!");
                                 break;
@@ -181,14 +168,12 @@ public class RegistryServiceDBEventHandler implements MessageHandler {
                     }
                 }
                 registryClientPool.returnResource(registryClient);
-                sharingClientPool.returnResource(sharingClient);
             } catch (DuplicateEntryException ex) {
                 // log this exception and proceed (do nothing)
                 // this exception is thrown mostly when messages are re-consumed, hence ignore
                 logger.warn("DuplicateEntryException while consuming db-event message, ex: " + ex.getMessage(), ex);
             } catch (Exception ex) {
                 registryClientPool.returnBrokenResource(registryClient);
-                sharingClientPool.returnBrokenResource(sharingClient);
                 throw ex;
             }
             // send ack for received message
@@ -200,6 +185,9 @@ public class RegistryServiceDBEventHandler implements MessageHandler {
             logger.error("Error fetching application settings: " + ex, ex);
         } catch (AiravataException ex) {
             logger.error("Error sending ack. Message Delivery Tag: " + messageContext.getDeliveryTag(), ex);
+        } catch (Throwable t) {
+            // Catch all exceptions types otherwise RabbitMQ's DefaultExceptionHandler will close the channel
+            logger.error("Failed to handle message: " + t, t);
         }
     }
 
diff --git a/modules/sharing-registry/sharing-registry-server/src/main/java/org/apache/airavata/sharing/registry/messaging/SharingServiceDBEventHandler.java b/modules/sharing-registry/sharing-registry-server/src/main/java/org/apache/airavata/sharing/registry/messaging/SharingServiceDBEventHandler.java
index 54a1b2f..ddcd77b 100644
--- a/modules/sharing-registry/sharing-registry-server/src/main/java/org/apache/airavata/sharing/registry/messaging/SharingServiceDBEventHandler.java
+++ b/modules/sharing-registry/sharing-registry-server/src/main/java/org/apache/airavata/sharing/registry/messaging/SharingServiceDBEventHandler.java
@@ -32,8 +32,10 @@ import org.apache.airavata.model.error.DuplicateEntryException;
 import org.apache.airavata.model.group.ResourceType;
 import org.apache.airavata.model.user.UserProfile;
 import org.apache.airavata.model.workspace.Gateway;
+import org.apache.airavata.model.workspace.Project;
 import org.apache.airavata.sharing.registry.client.SharingRegistryServiceClientFactory;
 import org.apache.airavata.sharing.registry.models.Domain;
+import org.apache.airavata.sharing.registry.models.Entity;
 import org.apache.airavata.sharing.registry.models.PermissionType;
 import org.apache.airavata.sharing.registry.models.SharingRegistryException;
 import org.apache.airavata.sharing.registry.models.User;
@@ -252,6 +254,52 @@ public class SharingServiceDBEventHandler implements MessageHandler {
 
 
                         break;
+                    
+                    case PROJECT :
+                        log.info("Project specific DB Event communicated by " + dbEventMessage.getPublisherService());
+
+                        Project project = new Project();
+                        ThriftUtils.createThriftFromBytes(dBEventMessageContext.getPublisher().getPublisherContext().getEntityDataModel(), project);
+                        final String domainId = project.getGatewayId();
+                        final String entityId = project.getProjectID();
+
+                        switch (dBEventMessageContext.getPublisher().getPublisherContext().getCrudType()){
+
+                            case CREATE:
+                            case UPDATE:
+
+                                Entity entity = new Entity();
+                                entity.setEntityId(entityId);
+                                entity.setDomainId(domainId);
+                                entity.setEntityTypeId(domainId + ":" + ResourceType.PROJECT.name());
+                                entity.setOwnerId(project.getOwner() + "@" + domainId);
+                                entity.setName(project.getName());
+                                entity.setDescription(project.getDescription());
+
+                                if (!sharingRegistryClient.isEntityExists(domainId, entityId)) {
+                                    log.info("Creating project entity. Entity Id : " + entityId);
+                                    sharingRegistryClient.createEntity(entity);
+                                    log.info("Project entity created. Entity Id : " + entityId);
+                                } else {
+                                    log.info("Updating project entity. Entity Id : " + entityId);
+                                    sharingRegistryClient.updateEntity(entity);
+                                    log.info("Project entity updated. Entity Id : " + entityId);
+                                }
+
+                                break;
+
+                            case READ:
+                                log.info("Ignoring READ crud operation for entity type PROJECT");
+                                break;
+
+                            case DELETE:
+                                log.info("Deleting project entity. Entity Id : " + entityId);
+                                sharingRegistryClient.deleteEntity(domainId, entityId);
+                                log.info("Project entity deleted. Entity Id : " + entityId);
+
+                                break;
+                        }
+                        break;
 
                     default: log.error("Handler not defined for " + dBEventMessageContext.getPublisher().getPublisherContext().getEntityType());
                 }
diff --git a/modules/sharing-registry/sharing-registry-server/src/main/java/org/apache/airavata/sharing/registry/utils/Constants.java b/modules/sharing-registry/sharing-registry-server/src/main/java/org/apache/airavata/sharing/registry/utils/Constants.java
index 5c42312..13c335f 100644
--- a/modules/sharing-registry/sharing-registry-server/src/main/java/org/apache/airavata/sharing/registry/utils/Constants.java
+++ b/modules/sharing-registry/sharing-registry-server/src/main/java/org/apache/airavata/sharing/registry/utils/Constants.java
@@ -33,6 +33,9 @@ public class Constants {
      * List of publishers in which sharing service is interested.
      * Add publishers as required
      */
-    public static final List<String> PUBLISHERS = new ArrayList<String>(){{add(DBEventService.USER_PROFILE.toString());
-        add(DBEventService.TENANT.toString());};};
+    public static final List<String> PUBLISHERS = new ArrayList<String>(){{
+        add(DBEventService.USER_PROFILE.toString());
+        add(DBEventService.TENANT.toString());
+        add(DBEventService.REGISTRY.toString());
+    }};
 }