You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ma...@apache.org on 2019/04/03 19:07:06 UTC
[airavata] branch develop updated: AIRAVATA-3002 Use db event to
sync default project to sharing service
This is an automated email from the ASF dual-hosted git repository.
machristie pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/develop by this push:
new 770695d AIRAVATA-3002 Use db event to sync default project to sharing service
770695d is described below
commit 770695d7282b0c788316d0705e86b36e635652c5
Author: Marcus Christie <ma...@apache.org>
AuthorDate: Wed Apr 3 15:03:33 2019 -0400
AIRAVATA-3002 Use db event to sync default project to sharing service
---
.../messaging/core/util/DBEventPublisherUtils.java | 96 ++++++++++++++++++++++
.../messaging/RegistryServiceDBEventHandler.java | 32 +++-----
.../messaging/SharingServiceDBEventHandler.java | 48 +++++++++++
.../airavata/sharing/registry/utils/Constants.java | 7 +-
4 files changed, 159 insertions(+), 24 deletions(-)
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/util/DBEventPublisherUtils.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/util/DBEventPublisherUtils.java
new file mode 100644
index 0000000..61b861c
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/util/DBEventPublisherUtils.java
@@ -0,0 +1,96 @@
+package org.apache.airavata.messaging.core.util;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.DBEventManagerConstants;
+import org.apache.airavata.common.utils.DBEventService;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.dbevent.CrudType;
+import org.apache.airavata.model.dbevent.DBEventMessage;
+import org.apache.airavata.model.dbevent.DBEventMessageContext;
+import org.apache.airavata.model.dbevent.DBEventPublisher;
+import org.apache.airavata.model.dbevent.DBEventPublisherContext;
+import org.apache.airavata.model.dbevent.DBEventType;
+import org.apache.airavata.model.dbevent.EntityType;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.thrift.TBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DBEventPublisherUtils
+ */
+public class DBEventPublisherUtils {
+
+ private final static Logger logger = LoggerFactory.getLogger(DBEventPublisherUtils.class);
+ private static Publisher dbEventPublisher = null;
+
+ /**
+ * Publish DB Event for given entity.
+ * @param entityType
+ * @param crudType
+ * @param entityModel
+ */
+ public static void publish(EntityType entityType, CrudType crudType, TBase entityModel) throws AiravataException {
+
+ DBEventPublisherUtils.getDbEventPublisher().publish(
+ DBEventPublisherUtils.getDBEventMessageContext(entityType, crudType, entityModel),
+ DBEventManagerConstants.getRoutingKey(DBEventService.DB_EVENT.toString()));
+ }
+
+ /**
+ * Returns singleton instance of dbEventPublisher
+ * @return
+ * @throws AiravataException
+ */
+ private static Publisher getDbEventPublisher() throws AiravataException {
+ if(null == dbEventPublisher){
+ synchronized (DBEventPublisherUtils.class){
+ if(null == dbEventPublisher){
+ logger.info("Creating DB Event publisher.....");
+ dbEventPublisher = MessagingFactory.getDBEventPublisher();
+ logger.info("DB Event publisher created");
+ }
+ }
+ }
+ return dbEventPublisher;
+ }
+
+ /**
+ * Constructs the dbEventMessageContext
+ * @param entityType
+ * @param crudType
+ * @param entityModel
+ * @return
+ * @throws AiravataException
+ */
+ private static MessageContext getDBEventMessageContext(EntityType entityType, CrudType crudType, TBase entityModel) throws AiravataException {
+ try {
+ // set the publisherContext
+ DBEventMessage dbEventMessage = new DBEventMessage();
+ DBEventPublisherContext publisherContext = new DBEventPublisherContext();
+ publisherContext.setCrudType(crudType);
+ publisherContext.setEntityDataModel(ThriftUtils.serializeThriftObject(entityModel));
+ publisherContext.setEntityType(entityType);
+
+ // create dbEventPublisher with publisherContext
+ DBEventPublisher dbEventPublisher = new DBEventPublisher();
+ dbEventPublisher.setPublisherContext(publisherContext);
+
+ // set messageContext to dbEventPublisher
+ DBEventMessageContext dbMessageContext = DBEventMessageContext.publisher(dbEventPublisher);
+
+ // set dbEventMessage with messageContext
+ dbEventMessage.setDbEventType(DBEventType.PUBLISHER);
+ dbEventMessage.setPublisherService(DBEventManagerConstants.getDbEventServiceName(entityType));
+ dbEventMessage.setMessageContext(dbMessageContext);
+
+ // construct and return messageContext
+ return new MessageContext(dbEventMessage, MessageType.DB_EVENT, "", "");
+ } catch (Exception ex) {
+ throw new AiravataException(ex.getMessage(), ex);
+ }
+ }
+}
diff --git a/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/messaging/RegistryServiceDBEventHandler.java b/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/messaging/RegistryServiceDBEventHandler.java
index c93835e..3bbfe25 100644
--- a/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/messaging/RegistryServiceDBEventHandler.java
+++ b/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/messaging/RegistryServiceDBEventHandler.java
@@ -22,23 +22,24 @@ package org.apache.airavata.registry.api.service.messaging;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.DBEventManagerConstants;
+import org.apache.airavata.common.utils.DBEventService;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftClientPool;
import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.util.DBEventPublisherUtils;
+import org.apache.airavata.model.dbevent.CrudType;
import org.apache.airavata.model.dbevent.DBEventMessage;
import org.apache.airavata.model.dbevent.DBEventPublisherContext;
+import org.apache.airavata.model.dbevent.EntityType;
import org.apache.airavata.model.error.DuplicateEntryException;
-import org.apache.airavata.model.group.ResourceType;
import org.apache.airavata.model.user.UserProfile;
import org.apache.airavata.model.workspace.Gateway;
import org.apache.airavata.model.workspace.Project;
import org.apache.airavata.registry.api.RegistryService;
import org.apache.airavata.registry.api.exception.RegistryServiceException;
-import org.apache.airavata.registry.api.service.util.Constants;
-import org.apache.airavata.sharing.registry.models.Entity;
-import org.apache.airavata.sharing.registry.service.cpi.SharingRegistryService;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -53,7 +54,6 @@ public class RegistryServiceDBEventHandler implements MessageHandler {
private static final Logger logger = LoggerFactory.getLogger(RegistryServiceDBEventHandler.class);
private final ThriftClientPool<RegistryService.Client> registryClientPool;
- private final ThriftClientPool<SharingRegistryService.Client> sharingClientPool;
public RegistryServiceDBEventHandler() throws ApplicationSettingsException, RegistryServiceException {
@@ -66,9 +66,6 @@ public class RegistryServiceDBEventHandler implements MessageHandler {
poolConfig.numTestsPerEvictionRun = 10;
poolConfig.maxWait = 3000;
- sharingClientPool = new ThriftClientPool<>(
- tProtocol -> new SharingRegistryService.Client(tProtocol), poolConfig, ServerSettings.getSharingRegistryHost(),
- Integer.parseInt(ServerSettings.getSharingRegistryPort()));
registryClientPool = new ThriftClientPool<>(
tProtocol -> new RegistryService.Client(tProtocol), poolConfig, ServerSettings.getRegistryServerHost(),
Integer.parseInt(ServerSettings.getRegistryServerPort()));
@@ -90,7 +87,6 @@ public class RegistryServiceDBEventHandler implements MessageHandler {
logger.info("RegistryService, Replicated Entity: " + publisherContext.getEntityType());
RegistryService.Client registryClient = registryClientPool.getResource();
- SharingRegistryService.Client sharingClient = sharingClientPool.getResource();
// this try-block is mainly for catching DuplicateEntryException
try {
// check type of entity-type
@@ -145,17 +141,8 @@ public class RegistryServiceDBEventHandler implements MessageHandler {
Project defaultProject = createDefaultProject(registryClient, userProfile);
if (defaultProject != null) {
- // TODO: user may not yet exist in the sharing registry, should we check and try to create it?
- Entity entity = new Entity();
- entity.setEntityId(defaultProject.getProjectID());
- final String domainId = defaultProject.getGatewayId();
- entity.setDomainId(domainId);
- entity.setEntityTypeId(domainId + ":" + ResourceType.PROJECT.name());
- entity.setOwnerId(defaultProject.getOwner() + "@" + domainId);
- entity.setName(defaultProject.getName());
- entity.setDescription(defaultProject.getDescription());
- sharingClient.createEntity(entity);
- logger.info("Default project for {} added to sharing registry", userProfile.getUserId());
+ // Publish new PROJECT event (sharing service will listen for it and register this as a shared Entity)
+ DBEventPublisherUtils.publish(EntityType.PROJECT, CrudType.CREATE, defaultProject);
}
logger.info("addUser Replication Success!");
break;
@@ -181,14 +168,12 @@ public class RegistryServiceDBEventHandler implements MessageHandler {
}
}
registryClientPool.returnResource(registryClient);
- sharingClientPool.returnResource(sharingClient);
} catch (DuplicateEntryException ex) {
// log this exception and proceed (do nothing)
// this exception is thrown mostly when messages are re-consumed, hence ignore
logger.warn("DuplicateEntryException while consuming db-event message, ex: " + ex.getMessage(), ex);
} catch (Exception ex) {
registryClientPool.returnBrokenResource(registryClient);
- sharingClientPool.returnBrokenResource(sharingClient);
throw ex;
}
// send ack for received message
@@ -200,6 +185,9 @@ public class RegistryServiceDBEventHandler implements MessageHandler {
logger.error("Error fetching application settings: " + ex, ex);
} catch (AiravataException ex) {
logger.error("Error sending ack. Message Delivery Tag: " + messageContext.getDeliveryTag(), ex);
+ } catch (Throwable t) {
+ // Catch all exceptions types otherwise RabbitMQ's DefaultExceptionHandler will close the channel
+ logger.error("Failed to handle message: " + t, t);
}
}
diff --git a/modules/sharing-registry/sharing-registry-server/src/main/java/org/apache/airavata/sharing/registry/messaging/SharingServiceDBEventHandler.java b/modules/sharing-registry/sharing-registry-server/src/main/java/org/apache/airavata/sharing/registry/messaging/SharingServiceDBEventHandler.java
index 54a1b2f..ddcd77b 100644
--- a/modules/sharing-registry/sharing-registry-server/src/main/java/org/apache/airavata/sharing/registry/messaging/SharingServiceDBEventHandler.java
+++ b/modules/sharing-registry/sharing-registry-server/src/main/java/org/apache/airavata/sharing/registry/messaging/SharingServiceDBEventHandler.java
@@ -32,8 +32,10 @@ import org.apache.airavata.model.error.DuplicateEntryException;
import org.apache.airavata.model.group.ResourceType;
import org.apache.airavata.model.user.UserProfile;
import org.apache.airavata.model.workspace.Gateway;
+import org.apache.airavata.model.workspace.Project;
import org.apache.airavata.sharing.registry.client.SharingRegistryServiceClientFactory;
import org.apache.airavata.sharing.registry.models.Domain;
+import org.apache.airavata.sharing.registry.models.Entity;
import org.apache.airavata.sharing.registry.models.PermissionType;
import org.apache.airavata.sharing.registry.models.SharingRegistryException;
import org.apache.airavata.sharing.registry.models.User;
@@ -252,6 +254,52 @@ public class SharingServiceDBEventHandler implements MessageHandler {
break;
+
+ case PROJECT :
+ log.info("Project specific DB Event communicated by " + dbEventMessage.getPublisherService());
+
+ Project project = new Project();
+ ThriftUtils.createThriftFromBytes(dBEventMessageContext.getPublisher().getPublisherContext().getEntityDataModel(), project);
+ final String domainId = project.getGatewayId();
+ final String entityId = project.getProjectID();
+
+ switch (dBEventMessageContext.getPublisher().getPublisherContext().getCrudType()){
+
+ case CREATE:
+ case UPDATE:
+
+ Entity entity = new Entity();
+ entity.setEntityId(entityId);
+ entity.setDomainId(domainId);
+ entity.setEntityTypeId(domainId + ":" + ResourceType.PROJECT.name());
+ entity.setOwnerId(project.getOwner() + "@" + domainId);
+ entity.setName(project.getName());
+ entity.setDescription(project.getDescription());
+
+ if (!sharingRegistryClient.isEntityExists(domainId, entityId)) {
+ log.info("Creating project entity. Entity Id : " + entityId);
+ sharingRegistryClient.createEntity(entity);
+ log.info("Project entity created. Entity Id : " + entityId);
+ } else {
+ log.info("Updating project entity. Entity Id : " + entityId);
+ sharingRegistryClient.updateEntity(entity);
+ log.info("Project entity updated. Entity Id : " + entityId);
+ }
+
+ break;
+
+ case READ:
+ log.info("Ignoring READ crud operation for entity type PROJECT");
+ break;
+
+ case DELETE:
+ log.info("Deleting project entity. Entity Id : " + entityId);
+ sharingRegistryClient.deleteEntity(domainId, entityId);
+ log.info("Project entity deleted. Entity Id : " + entityId);
+
+ break;
+ }
+ break;
default: log.error("Handler not defined for " + dBEventMessageContext.getPublisher().getPublisherContext().getEntityType());
}
diff --git a/modules/sharing-registry/sharing-registry-server/src/main/java/org/apache/airavata/sharing/registry/utils/Constants.java b/modules/sharing-registry/sharing-registry-server/src/main/java/org/apache/airavata/sharing/registry/utils/Constants.java
index 5c42312..13c335f 100644
--- a/modules/sharing-registry/sharing-registry-server/src/main/java/org/apache/airavata/sharing/registry/utils/Constants.java
+++ b/modules/sharing-registry/sharing-registry-server/src/main/java/org/apache/airavata/sharing/registry/utils/Constants.java
@@ -33,6 +33,9 @@ public class Constants {
* List of publishers in which sharing service is interested.
* Add publishers as required
*/
- public static final List<String> PUBLISHERS = new ArrayList<String>(){{add(DBEventService.USER_PROFILE.toString());
- add(DBEventService.TENANT.toString());};};
+ public static final List<String> PUBLISHERS = new ArrayList<String>(){{
+ add(DBEventService.USER_PROFILE.toString());
+ add(DBEventService.TENANT.toString());
+ add(DBEventService.REGISTRY.toString());
+ }};
}