You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2020/06/19 05:18:14 UTC

[celix] branch master updated: Refactors all pubsub admins to handle multiple subscribers per bundle (#257)

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/master by this push:
     new 009b581  Refactors all pubsub admins to handle multiple subscribers per bundle (#257)
009b581 is described below

commit 009b581381790c9bd5d6184620a15d1f5a826e77
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Fri Jun 19 07:18:08 2020 +0200

    Refactors all pubsub admins to handle multiple subscribers per bundle (#257)
    
    * Refactors all pubsub admins to support multiple subscriber services with the same topic/scope combination in a single bundle.
    
    * Fixes pubsub receive function so that they can handle multiple subscriber and releases.
---
 bundles/pubsub/CMakeLists.txt                      |  10 +-
 .../src/pubsub_tcp_topic_receiver.c                |  69 +++++++++-----
 .../src/pubsub_udpmc_topic_receiver.c              |  67 +++++++++-----
 .../src/pubsub_websocket_topic_receiver.c          |  59 ++++++++----
 .../src/pubsub_zmq_topic_receiver.c                |  65 ++++++++-----
 bundles/pubsub/test/CMakeLists.txt                 |   2 +-
 bundles/pubsub/test/test/tst_activator.c           |  56 ++++++++---
 bundles/pubsub/test/test/tst_endpoint_activator.c  | 102 ---------------------
 8 files changed, 221 insertions(+), 209 deletions(-)

diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index f4ac92a..90d762a 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -34,19 +34,11 @@ if (PUBSUB)
         add_subdirectory(pubsub_admin_udp_mc)
     endif (BUILD_PUBSUB_PSA_UDP_MC)
 
-    set(BUILD_PUBSUB_PSA_WS_DEFAULT ON)
-    if (APPLE)
-        set(BUILD_PUBSUB_PSA_WS_DEFAULT OFF)
-    endif ()
-    option(BUILD_PUBSUB_PSA_WS "Build WebSocket PubSub Admin" ${BUILD_PUBSUB_PSA_WS_DEFAULT})
+    option(BUILD_PUBSUB_PSA_WS "Build WebSocket PubSub Admin" ON)
     if (BUILD_PUBSUB_PSA_WS)
         add_subdirectory(pubsub_admin_websocket)
-        if (APPLE)
-            message(WARNING "WebSocket PubSub Admin not supported on OSX, the tests are failing! See issue #161")
-        endif ()
     endif (BUILD_PUBSUB_PSA_WS)
 
-
     add_subdirectory(pubsub_api)
     add_subdirectory(pubsub_utils)
     add_subdirectory(pubsub_spi)
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index e327662..e9add2d 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -37,6 +37,7 @@
 #include "celix_utils_api.h"
 #include <uuid/uuid.h>
 #include <pubsub_admin_metrics.h>
+#include <celix_api.h>
 
 #define MAX_EPOLL_EVENTS     16
 #ifndef UUID_STR_LEN
@@ -110,11 +111,9 @@ typedef struct psa_tcp_subscriber_metrics_entry_t {
 } psa_tcp_subscriber_metrics_entry_t;
 
 typedef struct psa_tcp_subscriber_entry {
-    int usageCount;
     hash_map_t *msgTypes; //map from serializer svc
-    hash_map_t
-        *metrics; //key = msg type id, value = hash_map (key = origin uuid, value = psa_tcp_subscriber_metrics_entry_t*
-    pubsub_subscriber_t *svc;
+    hash_map_t *metrics; //key = msg type id, value = hash_map (key = origin uuid, value = psa_tcp_subscriber_metrics_entry_t*
+    hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
     bool initialized; //true if the init function is called through the receive thread
 } psa_tcp_subscriber_entry_t;
 
@@ -308,6 +307,7 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
             psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (entry != NULL) {
                 receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+                hashMap_destroy(entry->subscriberServices, false, false);
                 free(entry);
             }
 
@@ -435,6 +435,7 @@ static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const
     pubsub_tcp_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
     if (receiver->scope == NULL) {
         if (subScope != NULL) {
@@ -450,15 +451,16 @@ static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId);
     if (entry != NULL) {
-        entry->usageCount += 1;
+        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
     } else {
         //new create entry
         entry = calloc(1, sizeof(*entry));
-        entry->usageCount = 1;
-        entry->svc = svc;
+        entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
         entry->initialized = false;
         receiver->subscribers.allInitialized = false;
 
+        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
+
         int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t *) bnd,
                                                            &entry->msgTypes);
 
@@ -489,13 +491,15 @@ static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, co
     pubsub_tcp_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
+
 
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId);
     if (entry != NULL) {
-        entry->usageCount -= 1;
+        hashMap_remove(entry->subscriberServices, (void*)svcId);
     }
-    if (entry != NULL && entry->usageCount <= 0) {
+    if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
         //remove entry
         hashMap_remove(receiver->subscribers.map, (void *) bndId);
         int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
@@ -510,6 +514,7 @@ static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, co
             hashMap_destroy(origins, true, true);
         }
         hashMap_destroy(entry->metrics, false, false);
+        hashMap_destroy(entry->subscriberServices, false, false);
         free(entry);
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -520,7 +525,6 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
                              const pubsub_protocol_message_t *message, bool *releaseMsg, struct timespec *receiveTime) {
     //NOTE receiver->subscribers.mutex locked
     pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes, (void *) (uintptr_t) (message->header.msgId));
-    pubsub_subscriber_t *svc = entry->svc;
     bool monitor = receiver->metricsEnabled;
 
     //monitoring
@@ -550,14 +554,30 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
             }
 
             if (status == CELIX_SUCCESS) {
+                hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
                 bool release = true;
-                svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata,
-                             &release);
+                while (hashMapIterator_hasNext(&iter)) {
+                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
+                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata,
+                                 &release);
+                    if (!release && hashMapIterator_hasNext(&iter)) {
+                        //receive function has taken ownership and still more receive function to come ..
+                        //deserialize again for new message
+                        status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 1, &deSerializedMsg);
+                        if (status != CELIX_SUCCESS) {
+                            L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName,
+                                   receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+                            break;
+                        }
+                        release = true;
+                    }
+                }
                 if (release) {
                     msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
                 }
-                if (message->metadata.metadata)
+                if (message->metadata.metadata) {
                     celix_properties_destroy(message->metadata.metadata);
+                }
                 updateReceiveCount += 1;
             } else {
                 updateSerError += 1;
@@ -755,15 +775,20 @@ static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiv
         while (hashMapIterator_hasNext(&iter)) {
             psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (!entry->initialized) {
-                int rc = 0;
-                if (entry->svc != NULL && entry->svc->init != NULL) {
-                    rc = entry->svc->init(entry->svc->handle);
-                }
-                if (rc == 0) {
-                    entry->initialized = true;
-                } else {
-                    L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
-                    allInitialized = false;
+                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+                while (hashMapIterator_hasNext(&iter2)) {
+                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+                    int rc = 0;
+                    if (svc != NULL && svc->init != NULL) {
+                        rc = svc->init(svc->handle);
+                    }
+                    if (rc == 0) {
+                        //note now only initialized on first subscriber entries added.
+                        entry->initialized = true;
+                    } else {
+                        L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+                        allInitialized = false;
+                    }
                 }
             }
         }
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
index 3a93c78..2827e53 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
@@ -33,6 +33,7 @@
 #include <pubsub_endpoint.h>
 #include <arpa/inet.h>
 #include <celix_log_helper.h>
+#include <celix_api.h>
 #include "pubsub_udpmc_topic_receiver.h"
 #include "pubsub_psa_udpmc_constants.h"
 #include "large_udp.h"
@@ -94,10 +95,8 @@ typedef struct psa_udpmc_requested_connection_entry {
 } psa_udpmc_requested_connection_entry_t;
 
 typedef struct psa_udpmc_subscriber_entry {
-    int usageCount;
     hash_map_t *msgTypes; //map from serializer svc
-    pubsub_subscriber_t *svc;
-
+    hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
     bool initialized; //true if the init function is called through the receive thread
 } psa_udpmc_subscriber_entry_t;
 
@@ -235,6 +234,7 @@ void pubsub_udpmcTopicReceiver_destroy(pubsub_udpmc_topic_receiver_t *receiver)
                 if (receiver->serializer != NULL && entry->msgTypes != NULL) {
                     receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
                 }
+                hashMap_destroy(entry->subscriberServices, false, false);
                 free(entry);
             }
         }
@@ -328,6 +328,7 @@ static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, con
     pubsub_udpmc_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
     if (receiver->scope == NULL) {
         if (subScope != NULL) {
@@ -343,14 +344,14 @@ static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, con
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_udpmc_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
     if (entry != NULL) {
-        entry->usageCount += 1;
+        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
     } else {
         //new create entry
         entry = calloc(1, sizeof(*entry));
-        entry->usageCount = 1;
-        entry->svc = svc;
+        entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
         entry->initialized = false;
         receiver->subscribers.allInitialized = false;
+        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
 
         int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
         if (rc == 0) {
@@ -367,19 +368,21 @@ static void pubsub_udpmcTopicReceiver_removeSubscriber(void *handle, void *svc,
     pubsub_udpmc_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
 
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_udpmc_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
     if (entry != NULL) {
-        entry->usageCount -= 1;
+        hashMap_remove(entry->subscriberServices, (void*)svcId);
     }
-    if (entry != NULL && entry->usageCount <= 0) {
+    if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
         //remove entry
         hashMap_remove(receiver->subscribers.map, (void*)bndId);
         int rc =  receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
         if (rc != 0) {
             fprintf(stderr, "Cannot find serializer for TopicReceiver %s/%s", receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
         }
+        hashMap_destroy(entry->subscriberServices, false, false);
         free(entry);
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -467,7 +470,7 @@ static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub
             msgSer = hashMap_get(entry->msgTypes, (void *) (uintptr_t) msg->header.type);
         }
         if (msgSer == NULL) {
-            printf("[PSA_UDPMC] Serializer not available for message %d.\n", msg->header.type);
+            L_WARN("[PSA_UDPMC] Serializer not available for message %d.\n", msg->header.type);
         } else {
             void *msgInst = NULL;
             bool validVersion = psa_udpmc_checkVersion(msgSer->msgVersion, &msg->header);
@@ -479,22 +482,34 @@ static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub
                 celix_status_t status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &msgInst);
 
                 if (status == CELIX_SUCCESS) {
+                    hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
                     bool release = true;
-                    pubsub_subscriber_t *svc = entry->svc;
-                    svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, NULL, &release);
-
+                    while (hashMapIterator_hasNext(&iter2)) {
+                        pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+                        svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, NULL, &release);
+                        if (!release && hashMapIterator_hasNext(&iter2)) {
+                            //receive function has taken ownership and still more receive function to come ..
+                            //deserialize again for new message
+                            status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &msgInst);
+                            if (status != CELIX_SUCCESS) {
+                                L_WARN("[PSA_UDPMC] Cannot deserialize msgType %s.\n",msgSer->msgName);
+                                break;
+                            }
+                            release = true;
+                        }
+                    }
                     if (release) {
                         msgSer->freeDeserializeMsg(msgSer->handle, msgInst);
                     }
                 } else {
-                    printf("[PSA_UDPMC] Cannot deserialize msgType %s.\n",msgSer->msgName);
+                    L_WARN("[PSA_UDPMC] Cannot deserialize msgType %s.\n",msgSer->msgName);
                 }
 
             } else {
                 int major = 0, minor = 0;
                 version_getMajor(msgSer->msgVersion, &major);
                 version_getMinor(msgSer->msgVersion, &minor);
-                printf("[PSA_UDPMC] Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",
+                L_WARN("[PSA_UDPMC] Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",
                        msgSer->msgName,major,minor,msg->header.major,msg->header.minor);
             }
 
@@ -589,15 +604,21 @@ static void psa_udpmc_initializeAllSubscribers(pubsub_udpmc_topic_receiver_t *re
         while (hashMapIterator_hasNext(&iter)) {
             psa_udpmc_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (!entry->initialized) {
-                int rc = 0;
-                if (entry->svc != NULL && entry->svc->init != NULL) {
-                    rc = entry->svc->init(entry->svc->handle);
-                }
-                if (rc == 0) {
-                    entry->initialized = true;
-                } else {
-                    L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
-                    allInitialized = false;
+
+                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+                while (hashMapIterator_hasNext(&iter2)) {
+                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+                    int rc = 0;
+                    if (svc != NULL && svc->init != NULL) {
+                        rc = svc->init(svc->handle);
+                    }
+                    if (rc == 0) {
+                        //note now only initialized on first subscriber entries added.
+                        entry->initialized = true;
+                    } else {
+                        L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+                        allInitialized = false;
+                    }
                 }
             }
         }
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
index 375177a..45da454 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
@@ -34,6 +34,7 @@
 #include <uuid/uuid.h>
 #include <http_admin/api.h>
 #include <jansson.h>
+#include <celix_api.h>
 
 #ifndef UUID_STR_LEN
 #define UUID_STR_LEN 37
@@ -107,9 +108,8 @@ typedef struct psa_websocket_requested_connection_entry {
 } psa_websocket_requested_connection_entry_t;
 
 typedef struct psa_websocket_subscriber_entry {
-    int usageCount;
     hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t
-    pubsub_subscriber_t *svc;
+    hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
     bool initialized; //true if the init function is called through the receive thread
 } psa_websocket_subscriber_entry_t;
 
@@ -263,6 +263,7 @@ void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *re
             psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (entry != NULL)  {
                 receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+                hashMap_destroy(entry->subscriberServices, false, false);
                 free(entry);
             }
 
@@ -394,6 +395,7 @@ static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc,
     pubsub_websocket_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
     if (receiver->scope == NULL){
         if (subScope != NULL){
@@ -409,13 +411,13 @@ static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc,
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_websocket_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
     if (entry != NULL) {
-        entry->usageCount += 1;
+        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
     } else {
         //new create entry
         entry = calloc(1, sizeof(*entry));
-        entry->usageCount = 1;
-        entry->svc = svc;
+        entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
         entry->initialized = false;
+        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
 
         int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
 
@@ -433,19 +435,22 @@ static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *s
     pubsub_websocket_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
+
 
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_websocket_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
     if (entry != NULL) {
-        entry->usageCount -= 1;
+        hashMap_remove(entry->subscriberServices, (void*)svcId);
     }
-    if (entry != NULL && entry->usageCount <= 0) {
+    if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
         //remove entry
         hashMap_remove(receiver->subscribers.map, (void*)bndId);
         int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
         if (rc != 0) {
             L_ERROR("[PSA_WEBSOCKET] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
         }
+        hashMap_destroy(entry->subscriberServices, false, false);
         free(entry);
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -472,7 +477,6 @@ static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_
     //NOTE receiver->subscribers.mutex locked
     void *msgTypeId = psa_websocket_getMsgTypeIdFromFqn(hdr->id, entry->msgTypes);
     pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes, msgTypeId);
-    pubsub_subscriber_t *svc = entry->svc;
 
     if (msgSer!= NULL && msgTypeId != 0) {
         void *deSerializedMsg = NULL;
@@ -484,8 +488,22 @@ static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_
             celix_status_t status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &deSerializedMsg);
 
             if (status == CELIX_SUCCESS) {
+                hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
                 bool release = true;
-                svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, NULL, &release);
+                while (hashMapIterator_hasNext(&iter)) {
+                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
+                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, NULL, &release);
+                    if (!release && hashMapIterator_hasNext(&iter)) {
+                        //receive function has taken ownership and still more receive function to come ..
+                        //deserialize again for new message
+                        status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &deSerializedMsg);
+                        if (status != CELIX_SUCCESS) {
+                            L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+                            break;
+                        }
+                        release = true;
+                    }
+                }
                 if (release) {
                     msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
                 }
@@ -727,15 +745,20 @@ static void psa_websocket_initializeAllSubscribers(pubsub_websocket_topic_receiv
         while (hashMapIterator_hasNext(&iter)) {
             psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (!entry->initialized) {
-                int rc = 0;
-                if (entry->svc != NULL && entry->svc->init != NULL) {
-                    rc = entry->svc->init(entry->svc->handle);
-                }
-                if (rc == 0) {
-                    entry->initialized = true;
-                } else {
-                    L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
-                    allInitialized = false;
+                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+                while (hashMapIterator_hasNext(&iter2)) {
+                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+                    int rc = 0;
+                    if (svc != NULL && svc->init != NULL) {
+                        rc = svc->init(svc->handle);
+                    }
+                    if (rc == 0) {
+                        //note now only initialized on first subscriber entries added.
+                        entry->initialized = true;
+                    } else {
+                        L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+                        allInitialized = false;
+                    }
                 }
             }
         }
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 0ec9d7a..7e0fff3 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -36,6 +36,7 @@
 
 #include <uuid/uuid.h>
 #include <pubsub_admin_metrics.h>
+#include <celix_api.h>
 
 #include "pubsub_interceptors_handler.h"
 
@@ -118,10 +119,9 @@ typedef struct psa_zmq_subscriber_metrics_entry_t {
 } psa_zmq_subscriber_metrics_entry_t;
 
 typedef struct psa_zmq_subscriber_entry {
-    int usageCount;
     hash_map_t *msgTypes; //map from serializer svc
     hash_map_t *metrics; //key = msg type id, value = hash_map (key = origin uuid, value = psa_zmq_subscriber_metrics_entry_t*
-    pubsub_subscriber_t *svc;
+    hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
     bool initialized; //true if the init function is called through the receive thread
 } psa_zmq_subscriber_entry_t;
 
@@ -295,6 +295,7 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) {
             psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (entry != NULL)  {
                 receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+                hashMap_destroy(entry->subscriberServices, false, false);
                 free(entry);
             }
 
@@ -412,6 +413,7 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
     pubsub_zmq_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
     if (receiver->scope == NULL){
         if (subScope != NULL){
@@ -427,13 +429,13 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_zmq_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
     if (entry != NULL) {
-        entry->usageCount += 1;
+        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
     } else {
         //new create entry
         entry = calloc(1, sizeof(*entry));
-        entry->usageCount = 1;
-        entry->svc = svc;
+        entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
         entry->initialized = false;
+        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
 
         int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
 
@@ -461,13 +463,14 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
     pubsub_zmq_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
 
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_zmq_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
     if (entry != NULL) {
-        entry->usageCount -= 1;
+        hashMap_remove(entry->subscriberServices, (void*)svcId);
     }
-    if (entry != NULL && entry->usageCount <= 0) {
+    if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
         //remove entry
         hashMap_remove(receiver->subscribers.map, (void*)bndId);
         int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
@@ -480,6 +483,7 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
             hashMap_destroy(origins, true, true);
         }
         hashMap_destroy(entry->metrics, false, false);
+        hashMap_destroy(entry->subscriberServices, false, false);
         free(entry);
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -488,7 +492,6 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
 static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *receiver, psa_zmq_subscriber_entry_t* entry, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
     //NOTE receiver->subscribers.mutex locked
     pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes, (void*)(uintptr_t)(message->header.msgId));
-    pubsub_subscriber_t *svc = entry->svc;
     bool monitor = receiver->metricsEnabled;
 
     //monitoring
@@ -517,16 +520,27 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
                 uint32_t msgId = message->header.msgId;
                 celix_properties_t *metadata = message->metadata.metadata;
                 bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, &metadata);
+                bool release = true;
                 if (cont) {
-                    bool release = true;
-                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg,
-                                 metadata, &release);
+                    hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+                    while (hashMapIterator_hasNext(&iter2)) {
+                        pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+                        svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, metadata, &release);
+                        pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata);
+                        if (!release && hashMapIterator_hasNext(&iter2)) {
+                            //receive function has taken ownership and still more receive function to come ..
+                            //deserialize again for new message
+                            status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &deserializedMsg);
+                            if (status != CELIX_SUCCESS) {
+                                L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+                                break;
+                            }
+                            release = true;
+                        }
+                    }
                     if (release) {
                         msgSer->freeDeserializeMsg(msgSer->handle, deserializedMsg);
                     }
-
-                    pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata);
-
                     updateReceiveCount += 1;
                 }
             } else {
@@ -778,15 +792,20 @@ static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiv
         while (hashMapIterator_hasNext(&iter)) {
             psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (!entry->initialized) {
-                int rc = 0;
-                if (entry->svc != NULL && entry->svc->init != NULL) {
-                    rc = entry->svc->init(entry->svc->handle);
-                }
-                if (rc == 0) {
-                    entry->initialized = true;
-                } else {
-                    L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
-                    allInitialized = false;
+                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+                while (hashMapIterator_hasNext(&iter2)) {
+                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+                    int rc = 0;
+                    if (svc != NULL && svc->init != NULL) {
+                        rc = svc->init(svc->handle);
+                    }
+                    if (rc == 0) {
+                        //note now only initialized on first subscriber entries added.
+                        entry->initialized = true;
+                    } else {
+                        L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+                        allInitialized = false;
+                    }
                 }
             }
         }
diff --git a/bundles/pubsub/test/CMakeLists.txt b/bundles/pubsub/test/CMakeLists.txt
index 9282aca..ef73910 100644
--- a/bundles/pubsub/test/CMakeLists.txt
+++ b/bundles/pubsub/test/CMakeLists.txt
@@ -38,7 +38,7 @@ celix_bundle_files(pubsub_endpoint_sut
 add_celix_bundle(pubsub_endpoint_tst
         #Test bundle containing cpputests and uses celix_test_runner launcher instead of the celix launcher
         SOURCES
-        test/tst_endpoint_activator.c
+        test/tst_activator.c
         VERSION 1.0.0
         )
 target_link_libraries(pubsub_endpoint_tst PRIVATE Celix::framework Celix::pubsub_api)
diff --git a/bundles/pubsub/test/test/tst_activator.c b/bundles/pubsub/test/test/tst_activator.c
index b1df062..c930a9f 100644
--- a/bundles/pubsub/test/test/tst_activator.c
+++ b/bundles/pubsub/test/test/tst_activator.c
@@ -29,17 +29,22 @@
 #include "receive_count_service.h"
 
 static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, const celix_properties_t *metadata, bool *release);
+static int tst_receive2(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, const celix_properties_t *metadata, bool *release);
 static size_t tst_count(void *handle);
 
 struct activator {
-    pubsub_subscriber_t subSvc;
-    long subSvcId;
+    pubsub_subscriber_t subSvc1;
+    long subSvcId1;
+
+    pubsub_subscriber_t subSvc2;
+    long subSvcId2;
 
     celix_receive_count_service_t countSvc;
     long countSvcId;
 
     pthread_mutex_t mutex;
-    unsigned int count;
+    unsigned int count1;
+    unsigned int count2;
 };
 
 celix_status_t bnd_start(struct activator *act, celix_bundle_context_t *ctx) {
@@ -48,9 +53,17 @@ celix_status_t bnd_start(struct activator *act, celix_bundle_context_t *ctx) {
     {
         celix_properties_t *props = celix_properties_create();
         celix_properties_set(props, PUBSUB_SUBSCRIBER_TOPIC, "ping");
-        act->subSvc.handle = act;
-        act->subSvc.receive = tst_receive;
-        act->subSvcId = celix_bundleContext_registerService(ctx, &act->subSvc, PUBSUB_SUBSCRIBER_SERVICE_NAME, props);
+        act->subSvc1.handle = act;
+        act->subSvc1.receive = tst_receive;
+        act->subSvcId1 = celix_bundleContext_registerService(ctx, &act->subSvc1, PUBSUB_SUBSCRIBER_SERVICE_NAME, props);
+    }
+
+    {
+        celix_properties_t *props = celix_properties_create();
+        celix_properties_set(props, PUBSUB_SUBSCRIBER_TOPIC, "ping");
+        act->subSvc2.handle = act;
+        act->subSvc2.receive = tst_receive2;
+        act->subSvcId2 = celix_bundleContext_registerService(ctx, &act->subSvc2, PUBSUB_SUBSCRIBER_SERVICE_NAME, props);
     }
 
     {
@@ -63,7 +76,8 @@ celix_status_t bnd_start(struct activator *act, celix_bundle_context_t *ctx) {
 }
 
 celix_status_t bnd_stop(struct activator *act, celix_bundle_context_t *ctx) {
-    celix_bundleContext_unregisterService(ctx, act->subSvcId);
+    celix_bundleContext_unregisterService(ctx, act->subSvcId1);
+    celix_bundleContext_unregisterService(ctx, act->subSvcId2);
     celix_bundleContext_unregisterService(ctx, act->countSvcId);
     pthread_mutex_destroy(&act->mutex);
     return CELIX_SUCCESS;
@@ -84,16 +98,36 @@ static int tst_receive(void *handle, const char * msgType __attribute__((unused)
     prevSeqNr = msg->seqNr;
 
     pthread_mutex_lock(&act->mutex);
-    act->count += 1;
+    act->count1 += 1;
+    pthread_mutex_unlock(&act->mutex);
+    return CELIX_SUCCESS;
+}
+
+static int tst_receive2(void *handle, const char * msgType __attribute__((unused)), unsigned int msgTypeId  __attribute__((unused)), void * voidMsg, const celix_properties_t *metadata  __attribute__((unused)), bool *release  __attribute__((unused))) {
+    struct activator *act = handle;
+
+    msg_t *msg = voidMsg;
+    static int prevSeqNr = 0;
+    int delta = msg->seqNr - prevSeqNr;
+    if (delta != 1) {
+        fprintf(stderr, "Warning: missing messages. seq jumped from %i to %i\n", prevSeqNr, msg->seqNr);
+    }
+    prevSeqNr = msg->seqNr;
+
+    pthread_mutex_lock(&act->mutex);
+    act->count2 += 1;
     pthread_mutex_unlock(&act->mutex);
     return CELIX_SUCCESS;
 }
 
 static size_t tst_count(void *handle) {
     struct activator *act = handle;
-    size_t count;
+    size_t count1;
+    size_t count2;
     pthread_mutex_lock(&act->mutex);
-    count = act->count;
+    count1 = act->count1;
+    count2 = act->count2;
     pthread_mutex_unlock(&act->mutex);
-    return count;
+    printf("msg count1 is %lu and msg count 2 is %lu\n", count1, count2);
+    return count1 >= count2 ? count1 : count2;
 }
diff --git a/bundles/pubsub/test/test/tst_endpoint_activator.c b/bundles/pubsub/test/test/tst_endpoint_activator.c
deleted file mode 100644
index 66a9749..0000000
--- a/bundles/pubsub/test/test/tst_endpoint_activator.c
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-
-#include "celix_api.h"
-#include "pubsub/api.h"
-
-#include "msg.h"
-#include "receive_count_service.h"
-
-
-static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, const celix_properties_t *metadata, bool *release);
-static size_t tst_count(void *handle);
-
-struct activator {
-    pubsub_subscriber_t subSvc;
-    long subSvcId;
-
-    celix_receive_count_service_t countSvc;
-    long countSvcId;
-
-    pthread_mutex_t mutex;
-    unsigned int count;
-};
-
-celix_status_t bnd_start(struct activator *act, celix_bundle_context_t *ctx) {
-    pthread_mutex_init(&act->mutex, NULL);
-
-    {
-        celix_properties_t *props = celix_properties_create();
-        celix_properties_set(props, PUBSUB_SUBSCRIBER_TOPIC, "ping2");
-        act->subSvc.handle = act;
-        act->subSvc.receive = tst_receive;
-        act->subSvcId = celix_bundleContext_registerService(ctx, &act->subSvc, PUBSUB_SUBSCRIBER_SERVICE_NAME, props);
-    }
-
-
-    {
-        act->countSvc.handle = act;
-        act->countSvc.receiveCount = tst_count;
-        act->countSvcId = celix_bundleContext_registerService(ctx, &act->countSvc, CELIX_RECEIVE_COUNT_SERVICE_NAME, NULL);
-    }
-
-
-    return CELIX_SUCCESS;
-}
-
-celix_status_t bnd_stop(struct activator *act, celix_bundle_context_t *ctx) {
-    celix_bundleContext_unregisterService(ctx, act->subSvcId);
-    celix_bundleContext_unregisterService(ctx, act->countSvcId);
-    pthread_mutex_destroy(&act->mutex);
-    return CELIX_SUCCESS;
-}
-
-CELIX_GEN_BUNDLE_ACTIVATOR(struct activator, bnd_start, bnd_stop) ;
-
-
-static int tst_receive(void *handle, const char * msgType __attribute__((unused)), unsigned int msgTypeId  __attribute__((unused)), void * voidMsg, const celix_properties_t *metadata  __attribute__((unused)), bool *release  __attribute__((unused))) {
-    struct activator *act = handle;
-
-    msg_t *msg = voidMsg;
-    static int prevSeqNr = 0;
-    int delta = msg->seqNr - prevSeqNr;
-    if (delta != 1) {
-       fprintf(stderr, "Warning: missing messages. seq jumped from %i to %i\n", prevSeqNr, msg->seqNr);
-    }
-    prevSeqNr = msg->seqNr;
-
-    pthread_mutex_lock(&act->mutex);
-    act->count += 1;
-    pthread_mutex_unlock(&act->mutex);
-    return CELIX_SUCCESS;
-}
-
-static size_t tst_count(void *handle) {
-    struct activator *act = handle;
-    size_t count;
-    pthread_mutex_lock(&act->mutex);
-    count = act->count;
-    pthread_mutex_unlock(&act->mutex);
-    return count;
-}