You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2020/06/17 18:38:53 UTC

[celix] branch feature/store_pubsub_subscriber_with_svcid created (now 9a1c906)

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a change to branch feature/store_pubsub_subscriber_with_svcid
in repository https://gitbox.apache.org/repos/asf/celix.git.


      at 9a1c906  Refactors all pubsub admins to support multiple subscriber services with the same topic/scope combination in a single bundle.

This branch includes the following new commits:

     new 9a1c906  Refactors all pubsub admins to support multiple subscriber services with the same topic/scope combination in a single bundle.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[celix] 01/01: Refactors all pubsub admins to support multiple subscriber services with the same topic/scope combination in a single bundle.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/store_pubsub_subscriber_with_svcid
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 9a1c906b54c6cd38ce5820b27a432e4c20678bc2
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Wed Jun 17 20:38:37 2020 +0200

    Refactors all pubsub admins to support multiple subscriber services with the same topic/scope combination in a single bundle.
---
 bundles/pubsub/CMakeLists.txt                      | 10 +---
 .../src/pubsub_tcp_topic_receiver.c                | 66 +++++++++++++---------
 .../src/pubsub_udpmc_topic_receiver.c              | 57 +++++++++++--------
 .../src/pubsub_websocket_topic_receiver.c          | 55 +++++++++++-------
 .../src/pubsub_zmq_topic_receiver.c                | 60 ++++++++++++--------
 5 files changed, 144 insertions(+), 104 deletions(-)

diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index f4ac92a..90d762a 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -34,19 +34,11 @@ if (PUBSUB)
         add_subdirectory(pubsub_admin_udp_mc)
     endif (BUILD_PUBSUB_PSA_UDP_MC)
 
-    set(BUILD_PUBSUB_PSA_WS_DEFAULT ON)
-    if (APPLE)
-        set(BUILD_PUBSUB_PSA_WS_DEFAULT OFF)
-    endif ()
-    option(BUILD_PUBSUB_PSA_WS "Build WebSocket PubSub Admin" ${BUILD_PUBSUB_PSA_WS_DEFAULT})
+    option(BUILD_PUBSUB_PSA_WS "Build WebSocket PubSub Admin" ON)
     if (BUILD_PUBSUB_PSA_WS)
         add_subdirectory(pubsub_admin_websocket)
-        if (APPLE)
-            message(WARNING "WebSocket PubSub Admin not supported on OSX, the tests are failing! See issue #161")
-        endif ()
     endif (BUILD_PUBSUB_PSA_WS)
 
-
     add_subdirectory(pubsub_api)
     add_subdirectory(pubsub_utils)
     add_subdirectory(pubsub_spi)
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index e327662..442c087 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -37,6 +37,7 @@
 #include "celix_utils_api.h"
 #include <uuid/uuid.h>
 #include <pubsub_admin_metrics.h>
+#include <celix_api.h>
 
 #define MAX_EPOLL_EVENTS     16
 #ifndef UUID_STR_LEN
@@ -110,11 +111,9 @@ typedef struct psa_tcp_subscriber_metrics_entry_t {
 } psa_tcp_subscriber_metrics_entry_t;
 
 typedef struct psa_tcp_subscriber_entry {
-    int usageCount;
     hash_map_t *msgTypes; //map from serializer svc
-    hash_map_t
-        *metrics; //key = msg type id, value = hash_map (key = origin uuid, value = psa_tcp_subscriber_metrics_entry_t*
-    pubsub_subscriber_t *svc;
+    hash_map_t *metrics; //key = msg type id, value = hash_map (key = origin uuid, value = psa_tcp_subscriber_metrics_entry_t*
+    hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
     bool initialized; //true if the init function is called through the receive thread
 } psa_tcp_subscriber_entry_t;
 
@@ -308,6 +307,7 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
             psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (entry != NULL) {
                 receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+                hashMap_destroy(entry->subscriberServices, false, false);
                 free(entry);
             }
 
@@ -435,6 +435,7 @@ static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const
     pubsub_tcp_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
     if (receiver->scope == NULL) {
         if (subScope != NULL) {
@@ -450,15 +451,16 @@ static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId);
     if (entry != NULL) {
-        entry->usageCount += 1;
+        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
     } else {
         //new create entry
         entry = calloc(1, sizeof(*entry));
-        entry->usageCount = 1;
-        entry->svc = svc;
+        entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
         entry->initialized = false;
         receiver->subscribers.allInitialized = false;
 
+        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
+
         int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t *) bnd,
                                                            &entry->msgTypes);
 
@@ -489,13 +491,15 @@ static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, co
     pubsub_tcp_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
+
 
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId);
     if (entry != NULL) {
-        entry->usageCount -= 1;
+        hashMap_remove(entry->subscriberServices, (void*)svcId);
     }
-    if (entry != NULL && entry->usageCount <= 0) {
+    if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
         //remove entry
         hashMap_remove(receiver->subscribers.map, (void *) bndId);
         int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
@@ -510,6 +514,7 @@ static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, co
             hashMap_destroy(origins, true, true);
         }
         hashMap_destroy(entry->metrics, false, false);
+        hashMap_destroy(entry->subscriberServices, false, false);
         free(entry);
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -520,7 +525,6 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
                              const pubsub_protocol_message_t *message, bool *releaseMsg, struct timespec *receiveTime) {
     //NOTE receiver->subscribers.mutex locked
     pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes, (void *) (uintptr_t) (message->header.msgId));
-    pubsub_subscriber_t *svc = entry->svc;
     bool monitor = receiver->metricsEnabled;
 
     //monitoring
@@ -550,14 +554,19 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
             }
 
             if (status == CELIX_SUCCESS) {
-                bool release = true;
-                svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata,
-                             &release);
-                if (release) {
-                    msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
+                hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+                while (hashMapIterator_hasNext(&iter)) {
+                    bool release = true;
+                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
+                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata,
+                                 &release);
+                    if (release) {
+                        msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
+                    }
+                    if (message->metadata.metadata) {
+                        celix_properties_destroy(message->metadata.metadata);
+                    }
                 }
-                if (message->metadata.metadata)
-                    celix_properties_destroy(message->metadata.metadata);
                 updateReceiveCount += 1;
             } else {
                 updateSerError += 1;
@@ -755,15 +764,20 @@ static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiv
         while (hashMapIterator_hasNext(&iter)) {
             psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (!entry->initialized) {
-                int rc = 0;
-                if (entry->svc != NULL && entry->svc->init != NULL) {
-                    rc = entry->svc->init(entry->svc->handle);
-                }
-                if (rc == 0) {
-                    entry->initialized = true;
-                } else {
-                    L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
-                    allInitialized = false;
+                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+                while (hashMapIterator_hasNext(&iter2)) {
+                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+                    int rc = 0;
+                    if (svc != NULL && svc->init != NULL) {
+                        rc = svc->init(svc->handle);
+                    }
+                    if (rc == 0) {
+                        //note now only initialized on first subscriber entries added.
+                        entry->initialized = true;
+                    } else {
+                        L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+                        allInitialized = false;
+                    }
                 }
             }
         }
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
index 3a93c78..10a76a2 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
@@ -33,6 +33,7 @@
 #include <pubsub_endpoint.h>
 #include <arpa/inet.h>
 #include <celix_log_helper.h>
+#include <celix_api.h>
 #include "pubsub_udpmc_topic_receiver.h"
 #include "pubsub_psa_udpmc_constants.h"
 #include "large_udp.h"
@@ -94,10 +95,8 @@ typedef struct psa_udpmc_requested_connection_entry {
 } psa_udpmc_requested_connection_entry_t;
 
 typedef struct psa_udpmc_subscriber_entry {
-    int usageCount;
     hash_map_t *msgTypes; //map from serializer svc
-    pubsub_subscriber_t *svc;
-
+    hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
     bool initialized; //true if the init function is called through the receive thread
 } psa_udpmc_subscriber_entry_t;
 
@@ -235,6 +234,7 @@ void pubsub_udpmcTopicReceiver_destroy(pubsub_udpmc_topic_receiver_t *receiver)
                 if (receiver->serializer != NULL && entry->msgTypes != NULL) {
                     receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
                 }
+                hashMap_destroy(entry->subscriberServices, false, false);
                 free(entry);
             }
         }
@@ -328,6 +328,7 @@ static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, con
     pubsub_udpmc_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
     if (receiver->scope == NULL) {
         if (subScope != NULL) {
@@ -343,14 +344,14 @@ static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, con
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_udpmc_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
     if (entry != NULL) {
-        entry->usageCount += 1;
+        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
     } else {
         //new create entry
         entry = calloc(1, sizeof(*entry));
-        entry->usageCount = 1;
-        entry->svc = svc;
+        entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
         entry->initialized = false;
         receiver->subscribers.allInitialized = false;
+        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
 
         int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
         if (rc == 0) {
@@ -367,19 +368,21 @@ static void pubsub_udpmcTopicReceiver_removeSubscriber(void *handle, void *svc,
     pubsub_udpmc_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
 
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_udpmc_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
     if (entry != NULL) {
-        entry->usageCount -= 1;
+        hashMap_remove(entry->subscriberServices, (void*)svcId);
     }
-    if (entry != NULL && entry->usageCount <= 0) {
+    if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
         //remove entry
         hashMap_remove(receiver->subscribers.map, (void*)bndId);
         int rc =  receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
         if (rc != 0) {
             fprintf(stderr, "Cannot find serializer for TopicReceiver %s/%s", receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
         }
+        hashMap_destroy(entry->subscriberServices, false, false);
         free(entry);
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -479,12 +482,14 @@ static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub
                 celix_status_t status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &msgInst);
 
                 if (status == CELIX_SUCCESS) {
-                    bool release = true;
-                    pubsub_subscriber_t *svc = entry->svc;
-                    svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, NULL, &release);
-
-                    if (release) {
-                        msgSer->freeDeserializeMsg(msgSer->handle, msgInst);
+                    hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+                    while (hashMapIterator_hasNext(&iter2)) {
+                        bool release = true;
+                        pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+                        svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, NULL, &release);
+                        if (release) {
+                            msgSer->freeDeserializeMsg(msgSer->handle, msgInst);
+                        }
                     }
                 } else {
                     printf("[PSA_UDPMC] Cannot deserialize msgType %s.\n",msgSer->msgName);
@@ -589,15 +594,21 @@ static void psa_udpmc_initializeAllSubscribers(pubsub_udpmc_topic_receiver_t *re
         while (hashMapIterator_hasNext(&iter)) {
             psa_udpmc_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (!entry->initialized) {
-                int rc = 0;
-                if (entry->svc != NULL && entry->svc->init != NULL) {
-                    rc = entry->svc->init(entry->svc->handle);
-                }
-                if (rc == 0) {
-                    entry->initialized = true;
-                } else {
-                    L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
-                    allInitialized = false;
+
+                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+                while (hashMapIterator_hasNext(&iter2)) {
+                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+                    int rc = 0;
+                    if (svc != NULL && svc->init != NULL) {
+                        rc = svc->init(svc->handle);
+                    }
+                    if (rc == 0) {
+                        //note now only initialized on first subscriber entries added.
+                        entry->initialized = true;
+                    } else {
+                        L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+                        allInitialized = false;
+                    }
                 }
             }
         }
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
index 375177a..0c991c5 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
@@ -34,6 +34,7 @@
 #include <uuid/uuid.h>
 #include <http_admin/api.h>
 #include <jansson.h>
+#include <celix_api.h>
 
 #ifndef UUID_STR_LEN
 #define UUID_STR_LEN 37
@@ -107,9 +108,8 @@ typedef struct psa_websocket_requested_connection_entry {
 } psa_websocket_requested_connection_entry_t;
 
 typedef struct psa_websocket_subscriber_entry {
-    int usageCount;
     hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t
-    pubsub_subscriber_t *svc;
+    hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
     bool initialized; //true if the init function is called through the receive thread
 } psa_websocket_subscriber_entry_t;
 
@@ -263,6 +263,7 @@ void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *re
             psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (entry != NULL)  {
                 receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+                hashMap_destroy(entry->subscriberServices, false, false);
                 free(entry);
             }
 
@@ -394,6 +395,7 @@ static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc,
     pubsub_websocket_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
     if (receiver->scope == NULL){
         if (subScope != NULL){
@@ -409,13 +411,13 @@ static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc,
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_websocket_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
     if (entry != NULL) {
-        entry->usageCount += 1;
+        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
     } else {
         //new create entry
         entry = calloc(1, sizeof(*entry));
-        entry->usageCount = 1;
-        entry->svc = svc;
+        entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
         entry->initialized = false;
+        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
 
         int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
 
@@ -433,19 +435,22 @@ static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *s
     pubsub_websocket_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
+
 
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_websocket_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
     if (entry != NULL) {
-        entry->usageCount -= 1;
+        hashMap_remove(entry->subscriberServices, (void*)svcId);
     }
-    if (entry != NULL && entry->usageCount <= 0) {
+    if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
         //remove entry
         hashMap_remove(receiver->subscribers.map, (void*)bndId);
         int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
         if (rc != 0) {
             L_ERROR("[PSA_WEBSOCKET] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
         }
+        hashMap_destroy(entry->subscriberServices, false, false);
         free(entry);
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -472,7 +477,6 @@ static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_
     //NOTE receiver->subscribers.mutex locked
     void *msgTypeId = psa_websocket_getMsgTypeIdFromFqn(hdr->id, entry->msgTypes);
     pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes, msgTypeId);
-    pubsub_subscriber_t *svc = entry->svc;
 
     if (msgSer!= NULL && msgTypeId != 0) {
         void *deSerializedMsg = NULL;
@@ -484,10 +488,14 @@ static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_
             celix_status_t status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &deSerializedMsg);
 
             if (status == CELIX_SUCCESS) {
-                bool release = true;
-                svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, NULL, &release);
-                if (release) {
-                    msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
+                hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+                while (hashMapIterator_hasNext(&iter)) {
+                    bool release = true;
+                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
+                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, NULL, &release);
+                    if (release) {
+                        msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
+                    }
                 }
             } else {
                 L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
@@ -727,15 +735,20 @@ static void psa_websocket_initializeAllSubscribers(pubsub_websocket_topic_receiv
         while (hashMapIterator_hasNext(&iter)) {
             psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (!entry->initialized) {
-                int rc = 0;
-                if (entry->svc != NULL && entry->svc->init != NULL) {
-                    rc = entry->svc->init(entry->svc->handle);
-                }
-                if (rc == 0) {
-                    entry->initialized = true;
-                } else {
-                    L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
-                    allInitialized = false;
+                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+                while (hashMapIterator_hasNext(&iter2)) {
+                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+                    int rc = 0;
+                    if (svc != NULL && svc->init != NULL) {
+                        rc = svc->init(svc->handle);
+                    }
+                    if (rc == 0) {
+                        //note now only initialized on first subscriber entries added.
+                        entry->initialized = true;
+                    } else {
+                        L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+                        allInitialized = false;
+                    }
                 }
             }
         }
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 0ec9d7a..af9a2b7 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -36,6 +36,7 @@
 
 #include <uuid/uuid.h>
 #include <pubsub_admin_metrics.h>
+#include <celix_api.h>
 
 #include "pubsub_interceptors_handler.h"
 
@@ -118,10 +119,9 @@ typedef struct psa_zmq_subscriber_metrics_entry_t {
 } psa_zmq_subscriber_metrics_entry_t;
 
 typedef struct psa_zmq_subscriber_entry {
-    int usageCount;
     hash_map_t *msgTypes; //map from serializer svc
     hash_map_t *metrics; //key = msg type id, value = hash_map (key = origin uuid, value = psa_zmq_subscriber_metrics_entry_t*
-    pubsub_subscriber_t *svc;
+    hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
     bool initialized; //true if the init function is called through the receive thread
 } psa_zmq_subscriber_entry_t;
 
@@ -295,6 +295,7 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) {
             psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (entry != NULL)  {
                 receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+                hashMap_destroy(entry->subscriberServices, false, false);
                 free(entry);
             }
 
@@ -412,6 +413,7 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
     pubsub_zmq_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
     if (receiver->scope == NULL){
         if (subScope != NULL){
@@ -427,13 +429,13 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_zmq_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
     if (entry != NULL) {
-        entry->usageCount += 1;
+        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
     } else {
         //new create entry
         entry = calloc(1, sizeof(*entry));
-        entry->usageCount = 1;
-        entry->svc = svc;
+        entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
         entry->initialized = false;
+        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
 
         int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
 
@@ -461,13 +463,14 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
     pubsub_zmq_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
 
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_zmq_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
     if (entry != NULL) {
-        entry->usageCount -= 1;
+        hashMap_remove(entry->subscriberServices, (void*)svcId);
     }
-    if (entry != NULL && entry->usageCount <= 0) {
+    if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
         //remove entry
         hashMap_remove(receiver->subscribers.map, (void*)bndId);
         int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
@@ -480,6 +483,7 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
             hashMap_destroy(origins, true, true);
         }
         hashMap_destroy(entry->metrics, false, false);
+        hashMap_destroy(entry->subscriberServices, false, false);
         free(entry);
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -488,7 +492,6 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
 static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *receiver, psa_zmq_subscriber_entry_t* entry, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
     //NOTE receiver->subscribers.mutex locked
     pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes, (void*)(uintptr_t)(message->header.msgId));
-    pubsub_subscriber_t *svc = entry->svc;
     bool monitor = receiver->metricsEnabled;
 
     //monitoring
@@ -518,15 +521,17 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
                 celix_properties_t *metadata = message->metadata.metadata;
                 bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, &metadata);
                 if (cont) {
-                    bool release = true;
-                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg,
-                                 metadata, &release);
-                    if (release) {
-                        msgSer->freeDeserializeMsg(msgSer->handle, deserializedMsg);
+                    hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+                    while (hashMapIterator_hasNext(&iter2)) {
+                        bool release = true;
+                        pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+                        svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg,
+                                     metadata, &release);
+                        if (release) {
+                            msgSer->freeDeserializeMsg(msgSer->handle, deserializedMsg);
+                        }
+                        pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata);
                     }
-
-                    pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata);
-
                     updateReceiveCount += 1;
                 }
             } else {
@@ -778,15 +783,20 @@ static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiv
         while (hashMapIterator_hasNext(&iter)) {
             psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (!entry->initialized) {
-                int rc = 0;
-                if (entry->svc != NULL && entry->svc->init != NULL) {
-                    rc = entry->svc->init(entry->svc->handle);
-                }
-                if (rc == 0) {
-                    entry->initialized = true;
-                } else {
-                    L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
-                    allInitialized = false;
+                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+                while (hashMapIterator_hasNext(&iter2)) {
+                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+                    int rc = 0;
+                    if (svc != NULL && svc->init != NULL) {
+                        rc = svc->init(svc->handle);
+                    }
+                    if (rc == 0) {
+                        //note now only initialized on first subscriber entries added.
+                        entry->initialized = true;
+                    } else {
+                        L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+                        allInitialized = false;
+                    }
                 }
             }
         }