You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/12/18 19:55:38 UTC
[airavata] branch staging updated: Bringing db-event-manager back
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch staging
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/staging by this push:
new 78a2216 Bringing db-event-manager back
78a2216 is described below
commit 78a22163a39a9985d34fa635754ebf9064ee8305
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Wed Dec 19 01:25:26 2018 +0530
Bringing db-event-manager back
---
modules/db-event-manager/pom.xml | 41 ++++++
.../db/event/manager/DBEventManagerRunner.java | 143 +++++++++++++++++++++
.../manager/messaging/DBEventManagerException.java | 42 ++++++
.../messaging/DBEventManagerMessagingFactory.java | 76 +++++++++++
.../messaging/impl/DBEventMessageHandler.java | 112 ++++++++++++++++
.../airavata/db/event/manager/utils/Constants.java | 33 +++++
.../event/manager/utils/DbEventManagerZkUtils.java | 126 ++++++++++++++++++
modules/db-event-manager/src/test/java/Test.java | 25 ++++
modules/distribution/pom.xml | 5 +
pom.xml | 1 +
10 files changed, 604 insertions(+)
diff --git a/modules/db-event-manager/pom.xml b/modules/db-event-manager/pom.xml
new file mode 100644
index 0000000..ef6b89d
--- /dev/null
+++ b/modules/db-event-manager/pom.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under
+ the Apache License, Version 2.0 (theĆ "License"); you may not use this file except in compliance with the License. You may
+ obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to
+ in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+ ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under
+ the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <artifactId>airavata</artifactId>
+ <groupId>org.apache.airavata</groupId>
+ <version>0.17-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>db-event-manager</artifactId>
+ <packaging>jar</packaging>
+ <name>Airavata DB Event Manager</name>
+ <url>http://airavata.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-commons</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-messaging-core</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/DBEventManagerRunner.java b/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/DBEventManagerRunner.java
new file mode 100644
index 0000000..e45646f
--- /dev/null
+++ b/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/DBEventManagerRunner.java
@@ -0,0 +1,143 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.db.event.manager;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.IServer;
+import org.apache.airavata.db.event.manager.messaging.DBEventManagerMessagingFactory;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Created by Ajinkya on 3/29/17.
+ */
+public class DBEventManagerRunner implements IServer {
+
+ private static final Logger log = LogManager.getLogger(DBEventManagerRunner.class);
+
+ private static final String SERVER_NAME = "DB Event Manager";
+ private static final String SERVER_VERSION = "1.0";
+
+ private ServerStatus status;
+
+ /**
+ * Start required messaging utilities
+ */
+ private void startDBEventManagerRunner() {
+ try{
+ log.info("Starting DB Event manager publisher");
+
+ DBEventManagerMessagingFactory.getDBEventPublisher();
+ log.debug("DB Event manager publisher is running");
+
+ log.info("Starting DB Event manager subscriber");
+
+ DBEventManagerMessagingFactory.getDBEventSubscriber();
+ log.debug("DB Event manager subscriber is listening");
+ } catch (AiravataException e) {
+ log.error("Error starting DB Event Manager.", e);
+ }
+ }
+
+
+ /**
+ * The main method.
+ *
+ * @param args the arguments
+ */
+ public static void main(String[] args) {
+ try {
+ Runnable runner = new Runnable() {
+ @Override
+ public void run() {
+ DBEventManagerRunner dBEventManagerRunner = new DBEventManagerRunner();
+ dBEventManagerRunner.startDBEventManagerRunner();
+ }
+ };
+
+ // start the worker thread
+ log.info("Starting the DB Event Manager runner.");
+ new Thread(runner).start();
+ } catch (Exception ex) {
+ log.error("Something went wrong with the DB Event Manager runner. Error: " + ex, ex);
+ }
+ }
+
+ @Override
+ public String getName() {
+ return SERVER_NAME;
+ }
+
+ @Override
+ public String getVersion() {
+ return SERVER_VERSION;
+ }
+
+ @Override
+ public void start() throws Exception {
+
+ try {
+ Runnable runner = new Runnable() {
+ @Override
+ public void run() {
+ DBEventManagerRunner dBEventManagerRunner = new DBEventManagerRunner();
+ dBEventManagerRunner.startDBEventManagerRunner();
+ }
+ };
+
+ // start the worker thread
+ log.info("Starting the DB Event Manager runner.");
+ new Thread(runner).start();
+ setStatus(ServerStatus.STARTED);
+ } catch (Exception ex) {
+ log.error("Something went wrong with the DB Event Manager runner. Error: " + ex, ex);
+ setStatus(ServerStatus.FAILED);
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+
+ // TODO: implement stopping the DBEventManager
+ }
+
+ @Override
+ public void restart() throws Exception {
+
+ stop();
+ start();
+ }
+
+ @Override
+ public void configure() throws Exception {
+
+ }
+
+ @Override
+ public ServerStatus getStatus() throws Exception {
+ return status;
+ }
+
+ private void setStatus(ServerStatus stat){
+ status=stat;
+ status.updateTime();
+ }
+}
diff --git a/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/messaging/DBEventManagerException.java b/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/messaging/DBEventManagerException.java
new file mode 100644
index 0000000..fcdc03e
--- /dev/null
+++ b/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/messaging/DBEventManagerException.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.db.event.manager.messaging;
+
+/**
+ * Created by Ajinkya on 3/14/17.
+ */
+public class DBEventManagerException extends Exception {
+
+ private static final long serialVersionUID = -2849422320139467602L;
+
+ public DBEventManagerException(Throwable e) {
+ super(e);
+ }
+
+ public DBEventManagerException(String message) {
+ super(message, null);
+ }
+
+ public DBEventManagerException(String message, Throwable e) {
+ super(message, e);
+ }
+
+}
diff --git a/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/messaging/DBEventManagerMessagingFactory.java b/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/messaging/DBEventManagerMessagingFactory.java
new file mode 100644
index 0000000..517ea4e
--- /dev/null
+++ b/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/messaging/DBEventManagerMessagingFactory.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.db.event.manager.messaging;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.DBEventManagerConstants;
+import org.apache.airavata.common.utils.DBEventService;
+import org.apache.airavata.db.event.manager.messaging.impl.DBEventMessageHandler;
+import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.messaging.core.Subscriber;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by Ajinkya on 3/1/17.
+ */
+public class DBEventManagerMessagingFactory {
+
+ private final static Logger log = LoggerFactory.getLogger(DBEventManagerMessagingFactory.class);
+
+ private static Subscriber dbEventSubscriber;
+
+ private static Publisher dbEventPublisher;
+
+ /**
+ * Get DB Event subscriber
+ * @return
+ * @throws AiravataException
+ */
+ public static Subscriber getDBEventSubscriber() throws AiravataException {
+ if(null == dbEventSubscriber){
+ synchronized (DBEventManagerMessagingFactory.class){
+ if(null == dbEventSubscriber){
+ log.info("Creating DB Event subscriber.....");
+ dbEventSubscriber = MessagingFactory.getDBEventSubscriber(new DBEventMessageHandler(), DBEventService.DB_EVENT.toString());
+ log.info("DB Event subscriber created");
+ }
+ }
+ }
+ return dbEventSubscriber;
+ }
+
+
+ public static Publisher getDBEventPublisher() throws AiravataException {
+ if(null == dbEventPublisher){
+ synchronized (DBEventManagerMessagingFactory.class){
+ if(null == dbEventPublisher){
+ log.info("Creating DB Event publisher.....");
+ dbEventPublisher = MessagingFactory.getDBEventPublisher();
+ log.info("DB Event publisher created");
+ }
+ }
+ }
+ return dbEventPublisher;
+ }
+
+}
diff --git a/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/messaging/impl/DBEventMessageHandler.java b/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/messaging/impl/DBEventMessageHandler.java
new file mode 100644
index 0000000..95fdea6
--- /dev/null
+++ b/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/messaging/impl/DBEventMessageHandler.java
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.db.event.manager.messaging.impl;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.DBEventManagerConstants;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.db.event.manager.messaging.DBEventManagerException;
+import org.apache.airavata.db.event.manager.messaging.DBEventManagerMessagingFactory;
+import org.apache.airavata.db.event.manager.utils.DbEventManagerZkUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.dbevent.DBEventMessage;
+import org.apache.airavata.model.dbevent.DBEventMessageContext;
+import org.apache.airavata.model.dbevent.DBEventType;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.curator.framework.CuratorFramework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by Ajinkya on 3/14/17.
+ */
+public class DBEventMessageHandler implements MessageHandler {
+
+ private final static Logger log = LoggerFactory.getLogger(DBEventMessageHandler.class);
+ private CuratorFramework curatorClient;
+
+ public DBEventMessageHandler() throws ApplicationSettingsException {
+ startCuratorClient();
+ }
+
+ private void startCuratorClient() throws ApplicationSettingsException {
+ curatorClient = DbEventManagerZkUtils.getCuratorClient();
+ curatorClient.start();
+ }
+
+ @Override
+ public void onMessage(MessageContext messageContext) {
+
+ log.info("Incoming DB event message. Message Id : " + messageContext.getMessageId());
+ try {
+
+ byte[] bytes = ThriftUtils.serializeThriftObject(messageContext.getEvent());
+
+ DBEventMessage dbEventMessage = new DBEventMessage();
+ ThriftUtils.createThriftFromBytes(bytes, dbEventMessage);
+
+ DBEventMessageContext dBEventMessageContext = dbEventMessage.getMessageContext();
+
+ switch (dbEventMessage.getDbEventType()){
+
+ case SUBSCRIBER:
+ log.info("Registering " + dBEventMessageContext.getSubscriber().getSubscriberService() + " subscriber for " + dbEventMessage.getPublisherService());
+ DbEventManagerZkUtils.createDBEventMgrZkNode(curatorClient, dbEventMessage.getPublisherService(), dBEventMessageContext.getSubscriber().getSubscriberService());
+ break;
+ case PUBLISHER:
+ List<String> subscribers = DbEventManagerZkUtils.getSubscribersForPublisher(curatorClient, dbEventMessage.getPublisherService());
+ if(subscribers.isEmpty()){
+ log.error("No Subscribers registered for the service");
+ throw new DBEventManagerException("No Subscribers registered for the service");
+ }
+ String routingKey = getRoutingKeyFromList(subscribers);
+ log.info("Publishing " + dbEventMessage.getPublisherService() + " db event to " + subscribers.toString());
+ MessageContext messageCtx = new MessageContext(dbEventMessage, MessageType.DB_EVENT, "", "");
+ messageCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ DBEventManagerMessagingFactory.getDBEventPublisher().publish(messageCtx, routingKey);
+ break;
+ }
+
+ log.info("Sending ack. Message Delivery Tag : " + messageContext.getDeliveryTag());
+ DBEventManagerMessagingFactory.getDBEventSubscriber().sendAck(messageContext.getDeliveryTag());
+
+ } catch (Exception e) {
+ log.error("Error processing message.", e);
+ }
+ }
+
+ private String getRoutingKeyFromList(final List<String> subscribers){
+ StringBuilder sb = new StringBuilder();
+ Collections.sort(subscribers);
+ for(String subscriber : subscribers){
+ sb.append(subscriber).append(DBEventManagerConstants.ROUTING_KEY_SEPARATOR);
+ }
+ return sb.substring(0, sb.length() - 1);
+ }
+}
diff --git a/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/utils/Constants.java b/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/utils/Constants.java
new file mode 100644
index 0000000..e2db407
--- /dev/null
+++ b/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/utils/Constants.java
@@ -0,0 +1,33 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.db.event.manager.utils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by Ajinkya on 3/1/17.
+ */
+public class Constants {
+
+ public static final String DB_EVENT_MGR_ZK_PATH = "db-event-mgr";
+}
diff --git a/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/utils/DbEventManagerZkUtils.java b/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/utils/DbEventManagerZkUtils.java
new file mode 100644
index 0000000..b5ecfd1
--- /dev/null
+++ b/modules/db-event-manager/src/main/java/org/apache/airavata/db/event/manager/utils/DbEventManagerZkUtils.java
@@ -0,0 +1,126 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.db.event.manager.utils;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.DBEventManagerConstants;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.ZKPaths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Created by goshenoy on 3/21/17.
+ */
+public class DbEventManagerZkUtils {
+
+ private static final Logger logger = LoggerFactory.getLogger(DbEventManagerZkUtils.class);
+ private static CuratorFramework curatorClient;
+
+ /**
+ * Get curatorFramework instance
+ * @return
+ * @throws ApplicationSettingsException
+ */
+ public static CuratorFramework getCuratorClient() throws ApplicationSettingsException {
+ if (curatorClient == null) {
+ synchronized (DbEventManagerZkUtils.class) {
+ if (curatorClient == null) {
+ String connectionSting = ServerSettings.getZookeeperConnection();
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
+ curatorClient = CuratorFrameworkFactory.newClient(connectionSting, retryPolicy);
+ }
+ }
+ }
+
+ return curatorClient;
+ }
+
+ /**
+ * Create Zk node for db event manager
+ * @param curatorClient
+ * @param publisherNode
+ * @param subscriberNode
+ * @throws Exception
+ */
+ public static void createDBEventMgrZkNode(CuratorFramework curatorClient, String publisherNode, String subscriberNode) throws Exception {
+ // get pub,sub queue names
+
+ // construct ZK paths for pub,sub
+ String publisherZkPath = ZKPaths.makePath(Constants.DB_EVENT_MGR_ZK_PATH, publisherNode);
+ String subscriberZkPath = ZKPaths.makePath(publisherZkPath, subscriberNode);
+
+ // construct byte-data(s) for pub, sub
+ byte[] publisherZkData = publisherNode.getBytes();
+ byte[] subscriberZkData = subscriberNode.getBytes();
+
+ // create zkNode: "/db-event-mgr/pubqueuename/subqueueename"
+ logger.debug("Creating Zk node for db-event-mgr: " + subscriberZkPath);
+ ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), subscriberZkPath);
+
+ // set zkNode data for pub,sub
+ curatorClient.setData().withVersion(-1 ).forPath(publisherZkPath, publisherZkData);
+ curatorClient.setData().withVersion(-1 ).forPath(subscriberZkPath, subscriberZkData);
+ }
+
+ /**
+ * Get list of subscribers for given publisher
+ * @param curatorClient
+ * @param publisherNode
+ * @return
+ * @throws Exception
+ */
+ public static List<String> getSubscribersForPublisher(CuratorFramework curatorClient, String publisherNode) throws Exception {
+
+ // construct ZK path for pub
+ String publisherZkPath = ZKPaths.makePath(Constants.DB_EVENT_MGR_ZK_PATH, publisherNode);
+
+ // get children-list for pub
+ List<String> subscriberList = curatorClient.getChildren().forPath(publisherZkPath);
+
+ return subscriberList;
+ }
+
+// public static void main(String[] args) {
+// String connectionString = "localhost:2181";
+// String userProfileService = DBEventManagerConstants.DBEventService.USER_PROFILE.toString();
+// String sharingService = DBEventManagerConstants.DBEventService.SHARING.toString();
+// String registryService = DBEventManagerConstants.DBEventService.REGISTRY.toString();
+//
+// RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
+//
+// CuratorFramework curatorClient = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
+// curatorClient.start();
+// try {
+// DbEventManagerZkUtils.createDBEventMgrZkNode(curatorClient, userProfileService, sharingService);
+// DbEventManagerZkUtils.createDBEventMgrZkNode(curatorClient, userProfileService, registryService);
+// System.out.println(DbEventManagerZkUtils.getSubscribersForPublisher(curatorClient, userProfileService));
+// } catch (Exception ex) {
+// ex.printStackTrace();
+// }
+// }
+}
diff --git a/modules/db-event-manager/src/test/java/Test.java b/modules/db-event-manager/src/test/java/Test.java
new file mode 100644
index 0000000..fc019d9
--- /dev/null
+++ b/modules/db-event-manager/src/test/java/Test.java
@@ -0,0 +1,25 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+/**
+ * Created by Ajinkya on 3/1/17.
+ */
+public class Test {
+}
diff --git a/modules/distribution/pom.xml b/modules/distribution/pom.xml
index 00ebc0a..25d5b45 100644
--- a/modules/distribution/pom.xml
+++ b/modules/distribution/pom.xml
@@ -446,6 +446,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>db-event-manager</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<url>http://airavata.apache.org/</url>
diff --git a/pom.xml b/pom.xml
index add1bcf..8fc84a1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -713,6 +713,7 @@
<module>modules/cluster-monitoring</module>
<module>modules/user-profile-migration</module>
<module>airavata-services</module>
+ <module>modules/db-event-manager</module>
<module>modules/airavata-helix</module>
<module>modules/compute-account-provisioning</module>
<module>modules/job-monitor</module>