You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2020/06/17 18:38:54 UTC

[celix] 01/01: Refactors all pubsub admins to support multiple subscriber services with the same topic/scope combination in a single bundle.

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/store_pubsub_subscriber_with_svcid
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 9a1c906b54c6cd38ce5820b27a432e4c20678bc2
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Wed Jun 17 20:38:37 2020 +0200

    Refactors all pubsub admins to support multiple subscriber services with the same topic/scope combination in a single bundle.
---
 bundles/pubsub/CMakeLists.txt                      | 10 +---
 .../src/pubsub_tcp_topic_receiver.c                | 66 +++++++++++++---------
 .../src/pubsub_udpmc_topic_receiver.c              | 57 +++++++++++--------
 .../src/pubsub_websocket_topic_receiver.c          | 55 +++++++++++-------
 .../src/pubsub_zmq_topic_receiver.c                | 60 ++++++++++++--------
 5 files changed, 144 insertions(+), 104 deletions(-)

diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index f4ac92a..90d762a 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -34,19 +34,11 @@ if (PUBSUB)
         add_subdirectory(pubsub_admin_udp_mc)
     endif (BUILD_PUBSUB_PSA_UDP_MC)
 
-    set(BUILD_PUBSUB_PSA_WS_DEFAULT ON)
-    if (APPLE)
-        set(BUILD_PUBSUB_PSA_WS_DEFAULT OFF)
-    endif ()
-    option(BUILD_PUBSUB_PSA_WS "Build WebSocket PubSub Admin" ${BUILD_PUBSUB_PSA_WS_DEFAULT})
+    option(BUILD_PUBSUB_PSA_WS "Build WebSocket PubSub Admin" ON)
     if (BUILD_PUBSUB_PSA_WS)
         add_subdirectory(pubsub_admin_websocket)
-        if (APPLE)
-            message(WARNING "WebSocket PubSub Admin not supported on OSX, the tests are failing! See issue #161")
-        endif ()
     endif (BUILD_PUBSUB_PSA_WS)
 
-
     add_subdirectory(pubsub_api)
     add_subdirectory(pubsub_utils)
     add_subdirectory(pubsub_spi)
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index e327662..442c087 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -37,6 +37,7 @@
 #include "celix_utils_api.h"
 #include <uuid/uuid.h>
 #include <pubsub_admin_metrics.h>
+#include <celix_api.h>
 
 #define MAX_EPOLL_EVENTS     16
 #ifndef UUID_STR_LEN
@@ -110,11 +111,9 @@ typedef struct psa_tcp_subscriber_metrics_entry_t {
 } psa_tcp_subscriber_metrics_entry_t;
 
 typedef struct psa_tcp_subscriber_entry {
-    int usageCount;
     hash_map_t *msgTypes; //map from serializer svc
-    hash_map_t
-        *metrics; //key = msg type id, value = hash_map (key = origin uuid, value = psa_tcp_subscriber_metrics_entry_t*
-    pubsub_subscriber_t *svc;
+    hash_map_t *metrics; //key = msg type id, value = hash_map (key = origin uuid, value = psa_tcp_subscriber_metrics_entry_t*
+    hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
     bool initialized; //true if the init function is called through the receive thread
 } psa_tcp_subscriber_entry_t;
 
@@ -308,6 +307,7 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
             psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (entry != NULL) {
                 receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+                hashMap_destroy(entry->subscriberServices, false, false);
                 free(entry);
             }
 
@@ -435,6 +435,7 @@ static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const
     pubsub_tcp_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
     if (receiver->scope == NULL) {
         if (subScope != NULL) {
@@ -450,15 +451,16 @@ static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId);
     if (entry != NULL) {
-        entry->usageCount += 1;
+        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
     } else {
         //new create entry
         entry = calloc(1, sizeof(*entry));
-        entry->usageCount = 1;
-        entry->svc = svc;
+        entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
         entry->initialized = false;
         receiver->subscribers.allInitialized = false;
 
+        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
+
         int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t *) bnd,
                                                            &entry->msgTypes);
 
@@ -489,13 +491,15 @@ static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, co
     pubsub_tcp_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
+
 
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId);
     if (entry != NULL) {
-        entry->usageCount -= 1;
+        hashMap_remove(entry->subscriberServices, (void*)svcId);
     }
-    if (entry != NULL && entry->usageCount <= 0) {
+    if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
         //remove entry
         hashMap_remove(receiver->subscribers.map, (void *) bndId);
         int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
@@ -510,6 +514,7 @@ static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, co
             hashMap_destroy(origins, true, true);
         }
         hashMap_destroy(entry->metrics, false, false);
+        hashMap_destroy(entry->subscriberServices, false, false);
         free(entry);
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -520,7 +525,6 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
                              const pubsub_protocol_message_t *message, bool *releaseMsg, struct timespec *receiveTime) {
     //NOTE receiver->subscribers.mutex locked
     pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes, (void *) (uintptr_t) (message->header.msgId));
-    pubsub_subscriber_t *svc = entry->svc;
     bool monitor = receiver->metricsEnabled;
 
     //monitoring
@@ -550,14 +554,19 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
             }
 
             if (status == CELIX_SUCCESS) {
-                bool release = true;
-                svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata,
-                             &release);
-                if (release) {
-                    msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
+                hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+                while (hashMapIterator_hasNext(&iter)) {
+                    bool release = true;
+                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
+                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata,
+                                 &release);
+                    if (release) {
+                        msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
+                    }
+                    if (message->metadata.metadata) {
+                        celix_properties_destroy(message->metadata.metadata);
+                    }
                 }
-                if (message->metadata.metadata)
-                    celix_properties_destroy(message->metadata.metadata);
                 updateReceiveCount += 1;
             } else {
                 updateSerError += 1;
@@ -755,15 +764,20 @@ static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiv
         while (hashMapIterator_hasNext(&iter)) {
             psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (!entry->initialized) {
-                int rc = 0;
-                if (entry->svc != NULL && entry->svc->init != NULL) {
-                    rc = entry->svc->init(entry->svc->handle);
-                }
-                if (rc == 0) {
-                    entry->initialized = true;
-                } else {
-                    L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
-                    allInitialized = false;
+                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+                while (hashMapIterator_hasNext(&iter2)) {
+                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+                    int rc = 0;
+                    if (svc != NULL && svc->init != NULL) {
+                        rc = svc->init(svc->handle);
+                    }
+                    if (rc == 0) {
+                        //note now only initialized on first subscriber entries added.
+                        entry->initialized = true;
+                    } else {
+                        L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+                        allInitialized = false;
+                    }
                 }
             }
         }
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
index 3a93c78..10a76a2 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
@@ -33,6 +33,7 @@
 #include <pubsub_endpoint.h>
 #include <arpa/inet.h>
 #include <celix_log_helper.h>
+#include <celix_api.h>
 #include "pubsub_udpmc_topic_receiver.h"
 #include "pubsub_psa_udpmc_constants.h"
 #include "large_udp.h"
@@ -94,10 +95,8 @@ typedef struct psa_udpmc_requested_connection_entry {
 } psa_udpmc_requested_connection_entry_t;
 
 typedef struct psa_udpmc_subscriber_entry {
-    int usageCount;
     hash_map_t *msgTypes; //map from serializer svc
-    pubsub_subscriber_t *svc;
-
+    hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
     bool initialized; //true if the init function is called through the receive thread
 } psa_udpmc_subscriber_entry_t;
 
@@ -235,6 +234,7 @@ void pubsub_udpmcTopicReceiver_destroy(pubsub_udpmc_topic_receiver_t *receiver)
                 if (receiver->serializer != NULL && entry->msgTypes != NULL) {
                     receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
                 }
+                hashMap_destroy(entry->subscriberServices, false, false);
                 free(entry);
             }
         }
@@ -328,6 +328,7 @@ static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, con
     pubsub_udpmc_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
     if (receiver->scope == NULL) {
         if (subScope != NULL) {
@@ -343,14 +344,14 @@ static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, con
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_udpmc_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
     if (entry != NULL) {
-        entry->usageCount += 1;
+        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
     } else {
         //new create entry
         entry = calloc(1, sizeof(*entry));
-        entry->usageCount = 1;
-        entry->svc = svc;
+        entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
         entry->initialized = false;
         receiver->subscribers.allInitialized = false;
+        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
 
         int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
         if (rc == 0) {
@@ -367,19 +368,21 @@ static void pubsub_udpmcTopicReceiver_removeSubscriber(void *handle, void *svc,
     pubsub_udpmc_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
 
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_udpmc_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
     if (entry != NULL) {
-        entry->usageCount -= 1;
+        hashMap_remove(entry->subscriberServices, (void*)svcId);
     }
-    if (entry != NULL && entry->usageCount <= 0) {
+    if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
         //remove entry
         hashMap_remove(receiver->subscribers.map, (void*)bndId);
         int rc =  receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
         if (rc != 0) {
             fprintf(stderr, "Cannot find serializer for TopicReceiver %s/%s", receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
         }
+        hashMap_destroy(entry->subscriberServices, false, false);
         free(entry);
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -479,12 +482,14 @@ static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub
                 celix_status_t status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &msgInst);
 
                 if (status == CELIX_SUCCESS) {
-                    bool release = true;
-                    pubsub_subscriber_t *svc = entry->svc;
-                    svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, NULL, &release);
-
-                    if (release) {
-                        msgSer->freeDeserializeMsg(msgSer->handle, msgInst);
+                    hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+                    while (hashMapIterator_hasNext(&iter2)) {
+                        bool release = true;
+                        pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+                        svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, NULL, &release);
+                        if (release) {
+                            msgSer->freeDeserializeMsg(msgSer->handle, msgInst);
+                        }
                     }
                 } else {
                     printf("[PSA_UDPMC] Cannot deserialize msgType %s.\n",msgSer->msgName);
@@ -589,15 +594,21 @@ static void psa_udpmc_initializeAllSubscribers(pubsub_udpmc_topic_receiver_t *re
         while (hashMapIterator_hasNext(&iter)) {
             psa_udpmc_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (!entry->initialized) {
-                int rc = 0;
-                if (entry->svc != NULL && entry->svc->init != NULL) {
-                    rc = entry->svc->init(entry->svc->handle);
-                }
-                if (rc == 0) {
-                    entry->initialized = true;
-                } else {
-                    L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
-                    allInitialized = false;
+
+                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+                while (hashMapIterator_hasNext(&iter2)) {
+                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+                    int rc = 0;
+                    if (svc != NULL && svc->init != NULL) {
+                        rc = svc->init(svc->handle);
+                    }
+                    if (rc == 0) {
+                        //note now only initialized on first subscriber entries added.
+                        entry->initialized = true;
+                    } else {
+                        L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+                        allInitialized = false;
+                    }
                 }
             }
         }
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
index 375177a..0c991c5 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
@@ -34,6 +34,7 @@
 #include <uuid/uuid.h>
 #include <http_admin/api.h>
 #include <jansson.h>
+#include <celix_api.h>
 
 #ifndef UUID_STR_LEN
 #define UUID_STR_LEN 37
@@ -107,9 +108,8 @@ typedef struct psa_websocket_requested_connection_entry {
 } psa_websocket_requested_connection_entry_t;
 
 typedef struct psa_websocket_subscriber_entry {
-    int usageCount;
     hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t
-    pubsub_subscriber_t *svc;
+    hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
     bool initialized; //true if the init function is called through the receive thread
 } psa_websocket_subscriber_entry_t;
 
@@ -263,6 +263,7 @@ void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *re
             psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (entry != NULL)  {
                 receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+                hashMap_destroy(entry->subscriberServices, false, false);
                 free(entry);
             }
 
@@ -394,6 +395,7 @@ static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc,
     pubsub_websocket_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
     if (receiver->scope == NULL){
         if (subScope != NULL){
@@ -409,13 +411,13 @@ static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc,
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_websocket_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
     if (entry != NULL) {
-        entry->usageCount += 1;
+        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
     } else {
         //new create entry
         entry = calloc(1, sizeof(*entry));
-        entry->usageCount = 1;
-        entry->svc = svc;
+        entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
         entry->initialized = false;
+        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
 
         int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
 
@@ -433,19 +435,22 @@ static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *s
     pubsub_websocket_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
+
 
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_websocket_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
     if (entry != NULL) {
-        entry->usageCount -= 1;
+        hashMap_remove(entry->subscriberServices, (void*)svcId);
     }
-    if (entry != NULL && entry->usageCount <= 0) {
+    if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
         //remove entry
         hashMap_remove(receiver->subscribers.map, (void*)bndId);
         int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
         if (rc != 0) {
             L_ERROR("[PSA_WEBSOCKET] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
         }
+        hashMap_destroy(entry->subscriberServices, false, false);
         free(entry);
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -472,7 +477,6 @@ static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_
     //NOTE receiver->subscribers.mutex locked
     void *msgTypeId = psa_websocket_getMsgTypeIdFromFqn(hdr->id, entry->msgTypes);
     pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes, msgTypeId);
-    pubsub_subscriber_t *svc = entry->svc;
 
     if (msgSer!= NULL && msgTypeId != 0) {
         void *deSerializedMsg = NULL;
@@ -484,10 +488,14 @@ static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_
             celix_status_t status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &deSerializedMsg);
 
             if (status == CELIX_SUCCESS) {
-                bool release = true;
-                svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, NULL, &release);
-                if (release) {
-                    msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
+                hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+                while (hashMapIterator_hasNext(&iter)) {
+                    bool release = true;
+                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
+                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, NULL, &release);
+                    if (release) {
+                        msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
+                    }
                 }
             } else {
                 L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
@@ -727,15 +735,20 @@ static void psa_websocket_initializeAllSubscribers(pubsub_websocket_topic_receiv
         while (hashMapIterator_hasNext(&iter)) {
             psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (!entry->initialized) {
-                int rc = 0;
-                if (entry->svc != NULL && entry->svc->init != NULL) {
-                    rc = entry->svc->init(entry->svc->handle);
-                }
-                if (rc == 0) {
-                    entry->initialized = true;
-                } else {
-                    L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
-                    allInitialized = false;
+                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+                while (hashMapIterator_hasNext(&iter2)) {
+                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+                    int rc = 0;
+                    if (svc != NULL && svc->init != NULL) {
+                        rc = svc->init(svc->handle);
+                    }
+                    if (rc == 0) {
+                        //note now only initialized on first subscriber entries added.
+                        entry->initialized = true;
+                    } else {
+                        L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+                        allInitialized = false;
+                    }
                 }
             }
         }
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 0ec9d7a..af9a2b7 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -36,6 +36,7 @@
 
 #include <uuid/uuid.h>
 #include <pubsub_admin_metrics.h>
+#include <celix_api.h>
 
 #include "pubsub_interceptors_handler.h"
 
@@ -118,10 +119,9 @@ typedef struct psa_zmq_subscriber_metrics_entry_t {
 } psa_zmq_subscriber_metrics_entry_t;
 
 typedef struct psa_zmq_subscriber_entry {
-    int usageCount;
     hash_map_t *msgTypes; //map from serializer svc
     hash_map_t *metrics; //key = msg type id, value = hash_map (key = origin uuid, value = psa_zmq_subscriber_metrics_entry_t*
-    pubsub_subscriber_t *svc;
+    hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
     bool initialized; //true if the init function is called through the receive thread
 } psa_zmq_subscriber_entry_t;
 
@@ -295,6 +295,7 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) {
             psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (entry != NULL)  {
                 receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+                hashMap_destroy(entry->subscriberServices, false, false);
                 free(entry);
             }
 
@@ -412,6 +413,7 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
     pubsub_zmq_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
     if (receiver->scope == NULL){
         if (subScope != NULL){
@@ -427,13 +429,13 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_zmq_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
     if (entry != NULL) {
-        entry->usageCount += 1;
+        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
     } else {
         //new create entry
         entry = calloc(1, sizeof(*entry));
-        entry->usageCount = 1;
-        entry->svc = svc;
+        entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
         entry->initialized = false;
+        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
 
         int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
 
@@ -461,13 +463,14 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
     pubsub_zmq_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
 
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_zmq_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
     if (entry != NULL) {
-        entry->usageCount -= 1;
+        hashMap_remove(entry->subscriberServices, (void*)svcId);
     }
-    if (entry != NULL && entry->usageCount <= 0) {
+    if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
         //remove entry
         hashMap_remove(receiver->subscribers.map, (void*)bndId);
         int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
@@ -480,6 +483,7 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
             hashMap_destroy(origins, true, true);
         }
         hashMap_destroy(entry->metrics, false, false);
+        hashMap_destroy(entry->subscriberServices, false, false);
         free(entry);
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -488,7 +492,6 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
 static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *receiver, psa_zmq_subscriber_entry_t* entry, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
     //NOTE receiver->subscribers.mutex locked
     pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes, (void*)(uintptr_t)(message->header.msgId));
-    pubsub_subscriber_t *svc = entry->svc;
     bool monitor = receiver->metricsEnabled;
 
     //monitoring
@@ -518,15 +521,17 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
                 celix_properties_t *metadata = message->metadata.metadata;
                 bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, &metadata);
                 if (cont) {
-                    bool release = true;
-                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg,
-                                 metadata, &release);
-                    if (release) {
-                        msgSer->freeDeserializeMsg(msgSer->handle, deserializedMsg);
+                    hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+                    while (hashMapIterator_hasNext(&iter2)) {
+                        bool release = true;
+                        pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+                        svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg,
+                                     metadata, &release);
+                        if (release) {
+                            msgSer->freeDeserializeMsg(msgSer->handle, deserializedMsg);
+                        }
+                        pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata);
                     }
-
-                    pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata);
-
                     updateReceiveCount += 1;
                 }
             } else {
@@ -778,15 +783,20 @@ static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiv
         while (hashMapIterator_hasNext(&iter)) {
             psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (!entry->initialized) {
-                int rc = 0;
-                if (entry->svc != NULL && entry->svc->init != NULL) {
-                    rc = entry->svc->init(entry->svc->handle);
-                }
-                if (rc == 0) {
-                    entry->initialized = true;
-                } else {
-                    L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
-                    allInitialized = false;
+                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+                while (hashMapIterator_hasNext(&iter2)) {
+                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+                    int rc = 0;
+                    if (svc != NULL && svc->init != NULL) {
+                        rc = svc->init(svc->handle);
+                    }
+                    if (rc == 0) {
+                        //note now only initialized on first subscriber entries added.
+                        entry->initialized = true;
+                    } else {
+                        L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+                        allInitialized = false;
+                    }
                 }
             }
         }