You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/12/18 19:55:38 UTC

[airavata] branch staging updated: Bringing db-event-manager back

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch staging
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/staging by this push:
     new 78a2216  Bringing db-event-manager back
78a2216 is described below

commit 78a22163a39a9985d34fa635754ebf9064ee8305
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Wed Dec 19 01:25:26 2018 +0530

    Bringing db-event-manager back
---
 modules/db-event-manager/pom.xml                   |  41 ++++++
 .../db/event/manager/DBEventManagerRunner.java     | 143 +++++++++++++++++++++
 .../manager/messaging/DBEventManagerException.java |  42 ++++++
 .../messaging/DBEventManagerMessagingFactory.java  |  76 +++++++++++
 .../messaging/impl/DBEventMessageHandler.java      | 112 ++++++++++++++++
 .../airavata/db/event/manager/utils/Constants.java |  33 +++++
 .../event/manager/utils/DbEventManagerZkUtils.java | 126 ++++++++++++++++++
 modules/db-event-manager/src/test/java/Test.java   |  25 ++++
 modules/distribution/pom.xml                       |   5 +
 pom.xml                                            |   1 +
 10 files changed, 604 insertions(+)

diff --git a/modules/db-event-manager/pom.xml b/modules/db-event-manager/pom.xml
new file mode 100644
index 0000000..ef6b89d
--- /dev/null
+++ b/modules/db-event-manager/pom.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file
+    distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under
+    the Apache License, Version 2.0 (theƏ "License"); you may not use this file except in compliance with the License. You may
+    obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to
+    in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+    ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under
+    the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <parent>
+        <artifactId>airavata</artifactId>
+        <groupId>org.apache.airavata</groupId>
+        <version>0.17-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>db-event-manager</artifactId>
+    <packaging>jar</packaging>
+    <name>Airavata DB Event Manager</name>
+    <url>http://airavata.apache.org/</url>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-commons</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-messaging-core</artifactId>
+            <version>0.17-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/DBEventManagerRunner.java b/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/DBEventManagerRunner.java
new file mode 100644
index 0000000..e45646f
--- /dev/null
+++ b/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/DBEventManagerRunner.java
@@ -0,0 +1,143 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.db.event.manager;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.IServer;
+import org.apache.airavata.db.event.manager.messaging.DBEventManagerMessagingFactory;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Created by Ajinkya on 3/29/17.
+ */
+public class DBEventManagerRunner implements IServer {
+
+    private static final Logger log = LogManager.getLogger(DBEventManagerRunner.class);
+
+    private static final String SERVER_NAME = "DB Event Manager";
+    private static final String SERVER_VERSION = "1.0";
+
+    private ServerStatus status;
+
+    /**
+     * Start required messaging utilities
+     */
+    private void startDBEventManagerRunner() {
+        try{
+            log.info("Starting DB Event manager publisher");
+
+            DBEventManagerMessagingFactory.getDBEventPublisher();
+            log.debug("DB Event manager publisher is running");
+
+            log.info("Starting DB Event manager subscriber");
+
+            DBEventManagerMessagingFactory.getDBEventSubscriber();
+            log.debug("DB Event manager subscriber is listening");
+        } catch (AiravataException e) {
+            log.error("Error starting DB Event Manager.", e);
+        }
+    }
+
+
+    /**
+     * The main method.
+     *
+     * @param args the arguments
+     */
+    public static void main(String[] args) {
+        try {
+            Runnable runner = new Runnable() {
+                @Override
+                public void run() {
+                    DBEventManagerRunner dBEventManagerRunner = new DBEventManagerRunner();
+                    dBEventManagerRunner.startDBEventManagerRunner();
+                }
+            };
+
+            // start the worker thread
+            log.info("Starting the DB Event Manager runner.");
+            new Thread(runner).start();
+        } catch (Exception ex) {
+            log.error("Something went wrong with the DB Event Manager runner. Error: " + ex, ex);
+        }
+    }
+
+    @Override
+    public String getName() {
+        return SERVER_NAME;
+    }
+
+    @Override
+    public String getVersion() {
+        return SERVER_VERSION;
+    }
+
+    @Override
+    public void start() throws Exception {
+
+        try {
+            Runnable runner = new Runnable() {
+                @Override
+                public void run() {
+                    DBEventManagerRunner dBEventManagerRunner = new DBEventManagerRunner();
+                    dBEventManagerRunner.startDBEventManagerRunner();
+                }
+            };
+
+            // start the worker thread
+            log.info("Starting the DB Event Manager runner.");
+            new Thread(runner).start();
+            setStatus(ServerStatus.STARTED);
+        } catch (Exception ex) {
+            log.error("Something went wrong with the DB Event Manager runner. Error: " + ex, ex);
+            setStatus(ServerStatus.FAILED);
+        }
+    }
+
+    @Override
+    public void stop() throws Exception {
+
+        // TODO: implement stopping the DBEventManager
+    }
+
+    @Override
+    public void restart() throws Exception {
+
+        stop();
+        start();
+    }
+
+    @Override
+    public void configure() throws Exception {
+
+    }
+
+    @Override
+    public ServerStatus getStatus() throws Exception {
+        return status;
+    }
+
+    private void setStatus(ServerStatus stat){
+        status=stat;
+        status.updateTime();
+    }
+}
diff --git a/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/messaging/DBEventManagerException.java b/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/messaging/DBEventManagerException.java
new file mode 100644
index 0000000..fcdc03e
--- /dev/null
+++ b/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/messaging/DBEventManagerException.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.db.event.manager.messaging;
+
+/**
+ * Created by Ajinkya on 3/14/17.
+ */
+public class DBEventManagerException extends Exception {
+
+    private static final long serialVersionUID = -2849422320139467602L;
+
+    public DBEventManagerException(Throwable e) {
+        super(e);
+    }
+
+    public DBEventManagerException(String message) {
+        super(message, null);
+    }
+
+    public DBEventManagerException(String message, Throwable e) {
+        super(message, e);
+    }
+
+}
diff --git a/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/messaging/DBEventManagerMessagingFactory.java b/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/messaging/DBEventManagerMessagingFactory.java
new file mode 100644
index 0000000..517ea4e
--- /dev/null
+++ b/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/messaging/DBEventManagerMessagingFactory.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.db.event.manager.messaging;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.DBEventManagerConstants;
+import org.apache.airavata.common.utils.DBEventService;
+import org.apache.airavata.db.event.manager.messaging.impl.DBEventMessageHandler;
+import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.messaging.core.Subscriber;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by Ajinkya on 3/1/17.
+ */
+public class DBEventManagerMessagingFactory {
+
+    private final static Logger log = LoggerFactory.getLogger(DBEventManagerMessagingFactory.class);
+
+    private static Subscriber dbEventSubscriber;
+
+    private static Publisher dbEventPublisher;
+
+    /**
+     * Get DB Event subscriber
+     * @return
+     * @throws AiravataException
+     */
+    public static Subscriber getDBEventSubscriber() throws AiravataException {
+        if(null == dbEventSubscriber){
+            synchronized (DBEventManagerMessagingFactory.class){
+                if(null == dbEventSubscriber){
+                    log.info("Creating DB Event subscriber.....");
+                    dbEventSubscriber = MessagingFactory.getDBEventSubscriber(new DBEventMessageHandler(), DBEventService.DB_EVENT.toString());
+                    log.info("DB Event subscriber created");
+                }
+            }
+        }
+        return dbEventSubscriber;
+    }
+
+
+    public static Publisher getDBEventPublisher() throws AiravataException {
+        if(null == dbEventPublisher){
+            synchronized (DBEventManagerMessagingFactory.class){
+                if(null == dbEventPublisher){
+                    log.info("Creating DB Event publisher.....");
+                    dbEventPublisher = MessagingFactory.getDBEventPublisher();
+                    log.info("DB Event publisher created");
+                }
+            }
+        }
+        return dbEventPublisher;
+    }
+
+}
diff --git a/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/messaging/impl/DBEventMessageHandler.java b/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/messaging/impl/DBEventMessageHandler.java
new file mode 100644
index 0000000..95fdea6
--- /dev/null
+++ b/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/messaging/impl/DBEventMessageHandler.java
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.db.event.manager.messaging.impl;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.DBEventManagerConstants;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.db.event.manager.messaging.DBEventManagerException;
+import org.apache.airavata.db.event.manager.messaging.DBEventManagerMessagingFactory;
+import org.apache.airavata.db.event.manager.utils.DbEventManagerZkUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.dbevent.DBEventMessage;
+import org.apache.airavata.model.dbevent.DBEventMessageContext;
+import org.apache.airavata.model.dbevent.DBEventType;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.curator.framework.CuratorFramework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by Ajinkya on 3/14/17.
+ */
+public class DBEventMessageHandler implements MessageHandler {
+
+    private final static Logger log = LoggerFactory.getLogger(DBEventMessageHandler.class);
+    private CuratorFramework curatorClient;
+
+    public DBEventMessageHandler() throws ApplicationSettingsException {
+        startCuratorClient();
+    }
+
+    private void startCuratorClient() throws ApplicationSettingsException {
+        curatorClient = DbEventManagerZkUtils.getCuratorClient();
+        curatorClient.start();
+    }
+
+    @Override
+    public void onMessage(MessageContext messageContext) {
+
+        log.info("Incoming DB event message. Message Id : " + messageContext.getMessageId());
+        try {
+
+            byte[] bytes = ThriftUtils.serializeThriftObject(messageContext.getEvent());
+
+            DBEventMessage dbEventMessage = new DBEventMessage();
+            ThriftUtils.createThriftFromBytes(bytes, dbEventMessage);
+
+            DBEventMessageContext dBEventMessageContext = dbEventMessage.getMessageContext();
+
+            switch (dbEventMessage.getDbEventType()){
+
+                case SUBSCRIBER:
+                    log.info("Registering " + dBEventMessageContext.getSubscriber().getSubscriberService() + " subscriber for " + dbEventMessage.getPublisherService());
+                    DbEventManagerZkUtils.createDBEventMgrZkNode(curatorClient, dbEventMessage.getPublisherService(), dBEventMessageContext.getSubscriber().getSubscriberService());
+                    break;
+                case PUBLISHER:
+                    List<String> subscribers = DbEventManagerZkUtils.getSubscribersForPublisher(curatorClient, dbEventMessage.getPublisherService());
+                    if(subscribers.isEmpty()){
+                        log.error("No Subscribers registered for the service");
+                        throw new DBEventManagerException("No Subscribers registered for the service");
+                    }
+                    String routingKey = getRoutingKeyFromList(subscribers);
+                    log.info("Publishing " + dbEventMessage.getPublisherService() + " db event to " + subscribers.toString());
+                    MessageContext messageCtx = new MessageContext(dbEventMessage, MessageType.DB_EVENT, "", "");
+                    messageCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+                    DBEventManagerMessagingFactory.getDBEventPublisher().publish(messageCtx, routingKey);
+                    break;
+            }
+
+            log.info("Sending ack. Message Delivery Tag : " + messageContext.getDeliveryTag());
+            DBEventManagerMessagingFactory.getDBEventSubscriber().sendAck(messageContext.getDeliveryTag());
+
+        } catch (Exception e) {
+            log.error("Error processing message.", e);
+        }
+    }
+
+    private String getRoutingKeyFromList(final List<String> subscribers){
+        StringBuilder sb = new StringBuilder();
+        Collections.sort(subscribers);
+        for(String subscriber : subscribers){
+            sb.append(subscriber).append(DBEventManagerConstants.ROUTING_KEY_SEPARATOR);
+        }
+        return sb.substring(0, sb.length() - 1);
+    }
+}
diff --git a/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/utils/Constants.java b/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/utils/Constants.java
new file mode 100644
index 0000000..e2db407
--- /dev/null
+++ b/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/utils/Constants.java
@@ -0,0 +1,33 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.db.event.manager.utils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by Ajinkya on 3/1/17.
+ */
+public class Constants {
+
+    public static final String DB_EVENT_MGR_ZK_PATH = "db-event-mgr";
+}
diff --git a/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/utils/DbEventManagerZkUtils.java b/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/utils/DbEventManagerZkUtils.java
new file mode 100644
index 0000000..b5ecfd1
--- /dev/null
+++ b/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/utils/DbEventManagerZkUtils.java
@@ -0,0 +1,126 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.db.event.manager.utils;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.DBEventManagerConstants;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.ZKPaths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Created by goshenoy on 3/21/17.
+ */
+public class DbEventManagerZkUtils {
+
+    private static final Logger logger = LoggerFactory.getLogger(DbEventManagerZkUtils.class);
+    private static CuratorFramework curatorClient;
+
+    /**
+     *  Get curatorFramework instance
+     * @return
+     * @throws ApplicationSettingsException
+     */
+    public static CuratorFramework getCuratorClient() throws ApplicationSettingsException {
+        if (curatorClient == null) {
+            synchronized (DbEventManagerZkUtils.class) {
+                if (curatorClient == null) {
+                    String connectionSting = ServerSettings.getZookeeperConnection();
+                    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
+                    curatorClient = CuratorFrameworkFactory.newClient(connectionSting, retryPolicy);
+                }
+            }
+        }
+
+        return curatorClient;
+    }
+
+    /**
+     * Create Zk node for db event manager
+     * @param curatorClient
+     * @param publisherNode
+     * @param subscriberNode
+     * @throws Exception
+     */
+    public static void createDBEventMgrZkNode(CuratorFramework curatorClient, String publisherNode, String subscriberNode) throws Exception {
+        // get pub,sub queue names
+
+        // construct ZK paths for pub,sub
+        String publisherZkPath = ZKPaths.makePath(Constants.DB_EVENT_MGR_ZK_PATH, publisherNode);
+        String subscriberZkPath = ZKPaths.makePath(publisherZkPath, subscriberNode);
+
+        // construct byte-data(s) for pub, sub
+        byte[] publisherZkData = publisherNode.getBytes();
+        byte[] subscriberZkData = subscriberNode.getBytes();
+
+        // create zkNode: "/db-event-mgr/pubqueuename/subqueueename"
+        logger.debug("Creating Zk node for db-event-mgr: " + subscriberZkPath);
+        ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), subscriberZkPath);
+
+        // set zkNode data for pub,sub
+        curatorClient.setData().withVersion(-1 ).forPath(publisherZkPath, publisherZkData);
+        curatorClient.setData().withVersion(-1 ).forPath(subscriberZkPath, subscriberZkData);
+    }
+
+    /**
+     * Get list of subscribers for given publisher
+     * @param curatorClient
+     * @param publisherNode
+     * @return
+     * @throws Exception
+     */
+    public static List<String> getSubscribersForPublisher(CuratorFramework curatorClient, String publisherNode) throws Exception {
+
+        // construct ZK path for pub
+        String publisherZkPath = ZKPaths.makePath(Constants.DB_EVENT_MGR_ZK_PATH, publisherNode);
+
+        // get children-list for pub
+        List<String> subscriberList = curatorClient.getChildren().forPath(publisherZkPath);
+
+        return subscriberList;
+    }
+
+//    public static void main(String[] args) {
+//        String connectionString = "localhost:2181";
+//        String userProfileService = DBEventManagerConstants.DBEventService.USER_PROFILE.toString();
+//        String sharingService = DBEventManagerConstants.DBEventService.SHARING.toString();
+//        String registryService = DBEventManagerConstants.DBEventService.REGISTRY.toString();
+//
+//        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
+//
+//        CuratorFramework curatorClient = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
+//        curatorClient.start();
+//        try {
+//            DbEventManagerZkUtils.createDBEventMgrZkNode(curatorClient, userProfileService, sharingService);
+//            DbEventManagerZkUtils.createDBEventMgrZkNode(curatorClient, userProfileService, registryService);
+//            System.out.println(DbEventManagerZkUtils.getSubscribersForPublisher(curatorClient, userProfileService));
+//        } catch (Exception ex) {
+//            ex.printStackTrace();
+//        }
+//    }
+}
diff --git a/modules/db-event-manager/src/test/java/Test.java b/modules/db-event-manager/src/test/java/Test.java
new file mode 100644
index 0000000..fc019d9
--- /dev/null
+++ b/modules/db-event-manager/src/test/java/Test.java
@@ -0,0 +1,25 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+/**
+ * Created by Ajinkya on 3/1/17.
+ */
+public class Test {
+}
diff --git a/modules/distribution/pom.xml b/modules/distribution/pom.xml
index 00ebc0a..25d5b45 100644
--- a/modules/distribution/pom.xml
+++ b/modules/distribution/pom.xml
@@ -446,6 +446,11 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>db-event-manager</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <url>http://airavata.apache.org/</url>
diff --git a/pom.xml b/pom.xml
index add1bcf..8fc84a1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -713,6 +713,7 @@
                 <module>modules/cluster-monitoring</module>
                 <module>modules/user-profile-migration</module>
                 <module>airavata-services</module>
+                <module>modules/db-event-manager</module>
                 <module>modules/airavata-helix</module>
                 <module>modules/compute-account-provisioning</module>
                 <module>modules/job-monitor</module>