You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2021/06/28 19:42:57 UTC

[celix] 06/07: Refactors v2 of pubsub tcp and zmq to only call interceptors callback once per receive. renames old psa to _v1.

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 461a2cea194c0bc203e03af38ce21a9a15d48182
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Sun Jun 27 22:36:30 2021 +0200

    Refactors v2 of pubsub tcp and zmq to only call interceptors callback once per receive. renames old psa to _v1.
---
 bundles/pubsub/CMakeLists.txt                      |   9 +-
 bundles/pubsub/integration/CMakeLists.txt          |   4 +-
 bundles/pubsub/integration/src/tst_activator.c     |   6 +-
 bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt  |  22 +--
 .../v1/src/pubsub_tcp_topic_receiver.c             |   6 +-
 .../v2/src/pubsub_tcp_topic_receiver.c             | 183 +++++++++------------
 .../pubsub_admin_websocket/v1/CMakeLists.txt       |  20 ++-
 .../v1/src/pubsub_websocket_topic_receiver.c       |   5 +-
 bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt  |  20 ++-
 .../v1/src/pubsub_zmq_topic_receiver.c             |  59 +------
 .../v2/src/pubsub_zmq_topic_receiver.c             | 172 +++++++++----------
 .../include/pubsub_interceptors_handler.h          |   2 +-
 .../pubsub_spi/src/pubsub_interceptors_handler.c   |  41 ++---
 13 files changed, 232 insertions(+), 317 deletions(-)

diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index 9285195..482ee3a 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -21,14 +21,16 @@ if (PUBSUB)
     option(BUILD_PUBSUB_PSA_ZMQ "Build ZeroMQ PubSub Admin (LGPL License)" ON)
     if (BUILD_PUBSUB_PSA_ZMQ)
         option(BUILD_ZMQ_SECURITY "Build with security for ZeroMQ." OFF)
-        add_subdirectory(pubsub_admin_zmq/v1)
+        add_subdirectory(pubsub_admin_zmq/v1) #TODO option for v1 admins
         add_subdirectory(pubsub_admin_zmq/v2)
+        add_library(Celix::pubsub_admin_zmq ALIAS celix_pubsub_admin_zmq_v1) #TODO move to config and set to v2
     endif (BUILD_PUBSUB_PSA_ZMQ)
 
     option(BUILD_PUBSUB_PSA_TCP "Build TCP PubSub Admin" ON)
     if (BUILD_PUBSUB_PSA_TCP)
-        add_subdirectory(pubsub_admin_tcp/v1)
+        add_subdirectory(pubsub_admin_tcp/v1) #TODO option for v1 admins
         add_subdirectory(pubsub_admin_tcp/v2)
+        add_library(Celix::pubsub_admin_tcp ALIAS celix_pubsub_admin_tcp_v1) #TODO move to config and set to v2
     endif (BUILD_PUBSUB_PSA_TCP)
 
     option(BUILD_PUBSUB_PSA_UDP_MC "Build UDP MC PubSub Admin" ON)
@@ -38,8 +40,9 @@ if (PUBSUB)
 
     option(BUILD_PUBSUB_PSA_WS "Build WebSocket PubSub Admin" ON)
     if (BUILD_PUBSUB_PSA_WS)
-        add_subdirectory(pubsub_admin_websocket/v1)
+        add_subdirectory(pubsub_admin_websocket/v1) #TODO option for v1 admins
         add_subdirectory(pubsub_admin_websocket/v2)
+        add_library(Celix::pubsub_admin_websocket ALIAS celix_pubsub_admin_websocket_v1) #TODO move to config and set to v2
     endif (BUILD_PUBSUB_PSA_WS)
 
     add_subdirectory(pubsub_api)
diff --git a/bundles/pubsub/integration/CMakeLists.txt b/bundles/pubsub/integration/CMakeLists.txt
index a134b74..9c9ce54 100644
--- a/bundles/pubsub/integration/CMakeLists.txt
+++ b/bundles/pubsub/integration/CMakeLists.txt
@@ -793,11 +793,11 @@ if (BUILD_PUBSUB_PSA_ZMQ)
     #configure topology manager and pubsub zmq, json serializer and wire protocol v2 bundles
     celix_get_bundle_file(Celix::pubsub_serializer_json PUBSUB_JSON_BUNDLE_FILE)
     celix_get_bundle_file(Celix::pubsub_topology_manager PUBSUB_TOPMAN_BUNDLE_FILE)
-    celix_get_bundle_file(Celix::pubsub_admin_zmq PUBSUB_ZMQ_BUNDLE_FILE)
+    celix_get_bundle_file(Celix::pubsub_admin_zmq_v2 PUBSUB_ZMQ_BUNDLE_FILE)
     celix_get_bundle_file(Celix::pubsub_protocol_wire_v1 PUBSUB_WIRE_BUNDLE_FILE)
     celix_get_bundle_file(pubsub_sut PUBSUB_PUBLISHER_BUNDLE_FILE)
     celix_get_bundle_file(pubsub_tst PUBSUB_SUBSCRIBER_BUNDLE_FILE)
-    add_celix_bundle_dependencies(test_pubsub_interceptors_integration Celix::pubsub_serializer_json Celix::pubsub_topology_manager Celix::pubsub_admin_zmq Celix::pubsub_protocol_wire_v1 pubsub_sut pubsub_tst)
+    add_celix_bundle_dependencies(test_pubsub_interceptors_integration Celix::pubsub_serializer_json Celix::pubsub_topology_manager Celix::pubsub_admin_zmq_v2 Celix::pubsub_protocol_wire_v1 pubsub_sut pubsub_tst)
     target_compile_definitions(test_pubsub_interceptors_integration PRIVATE
             PUBSUB_JSON_BUNDLE_FILE="${PUBSUB_JSON_BUNDLE_FILE}"
             PUBSUB_TOPMAN_BUNDLE_FILE="${PUBSUB_TOPMAN_BUNDLE_FILE}"
diff --git a/bundles/pubsub/integration/src/tst_activator.c b/bundles/pubsub/integration/src/tst_activator.c
index 8ce13ac..4017326 100644
--- a/bundles/pubsub/integration/src/tst_activator.c
+++ b/bundles/pubsub/integration/src/tst_activator.c
@@ -86,7 +86,7 @@ celix_status_t bnd_stop(struct activator *act, celix_bundle_context_t *ctx) {
 CELIX_GEN_BUNDLE_ACTIVATOR(struct activator, bnd_start, bnd_stop) ;
 
 
-static int tst_receive(void *handle, const char * msgType __attribute__((unused)), unsigned int msgTypeId  __attribute__((unused)), void * voidMsg, const celix_properties_t *metadata  __attribute__((unused)), bool *release  __attribute__((unused))) {
+static int tst_receive(void *handle, const char * msgType __attribute__((unused)), unsigned int msgTypeId  __attribute__((unused)), void * voidMsg, const celix_properties_t *metadata  __attribute__((unused)), bool *release) {
     struct activator *act = handle;
 
     msg_t *msg = voidMsg;
@@ -100,6 +100,10 @@ static int tst_receive(void *handle, const char * msgType __attribute__((unused)
     pthread_mutex_lock(&act->mutex);
     act->count1 += 1;
     pthread_mutex_unlock(&act->mutex);
+
+    *release = false;
+    free(voidMsg);
+
     return CELIX_SUCCESS;
 }
 
diff --git a/bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt b/bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt
index 0314cb6..2f1bca8 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt
@@ -15,10 +15,12 @@
 # specific language governing permissions and limitations
 # under the License.
 
+message(WARNING "PubSub TCP Admin V1 is deprecated, use PubSub TCP Websocket v2 instead")
+
 find_package(UUID REQUIRED)
 
-add_celix_bundle(celix_pubsub_admin_tcp
-    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_tcp"
+add_celix_bundle(celix_pubsub_admin_tcp_v1
+    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_tcp_v1"
     VERSION "1.0.0"
     GROUP "Celix/PubSub"
     SOURCES
@@ -30,15 +32,15 @@ add_celix_bundle(celix_pubsub_admin_tcp
         src/pubsub_tcp_common.c
 )
 
-set_target_properties(celix_pubsub_admin_tcp PROPERTIES INSTALL_RPATH "$ORIGIN")
-target_link_libraries(celix_pubsub_admin_tcp PRIVATE Celix::pubsub_spi Celix::pubsub_utils)
-target_link_libraries(celix_pubsub_admin_tcp PRIVATE Celix::framework Celix::dfi Celix::log_helper)
-target_include_directories(celix_pubsub_admin_tcp PRIVATE src)
+set_target_properties(celix_pubsub_admin_tcp_v1 PROPERTIES INSTALL_RPATH "$ORIGIN")
+target_link_libraries(celix_pubsub_admin_tcp_v1 PRIVATE Celix::pubsub_spi Celix::pubsub_utils)
+target_link_libraries(celix_pubsub_admin_tcp_v1 PRIVATE Celix::framework Celix::dfi Celix::log_helper)
+target_include_directories(celix_pubsub_admin_tcp_v1 PRIVATE src)
 # cmake find package UUID set the wrong include dir for OSX
 if (NOT APPLE)
-    target_link_libraries(celix_pubsub_admin_tcp PRIVATE UUID::lib)
+    target_link_libraries(celix_pubsub_admin_tcp_v1 PRIVATE UUID::lib)
 endif()
 
-install_celix_bundle(celix_pubsub_admin_tcp EXPORT celix COMPONENT pubsub)
-target_link_libraries(celix_pubsub_admin_tcp PRIVATE Celix::shell_api)
-add_library(Celix::pubsub_admin_tcp ALIAS celix_pubsub_admin_tcp)
+install_celix_bundle(celix_pubsub_admin_tcp_v1 EXPORT celix COMPONENT pubsub)
+target_link_libraries(celix_pubsub_admin_tcp_v1 PRIVATE Celix::shell_api)
+add_library(Celix::pubsub_admin_tcp_v1 ALIAS celix_pubsub_admin_tcp_v1)
diff --git a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c
index b1b2c58..8acb8e2 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c
@@ -560,9 +560,8 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
                         pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
                         svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata, &release);
                         pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata);
-                        if (!release && hashMapIterator_hasNext(&iter)) {
-                            //receive function has taken ownership and still more receive function to come ..
-                            //deserialize again for new message
+                        if (!release) {
+                            //receive function has taken ownership, deserialize again for new message
                             status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 1, &deSerializedMsg);
                             if (status != CELIX_SUCCESS) {
                                 L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s",
@@ -574,6 +573,7 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
                             release = true;
                         }
                     }
+                    pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata);
                     if (release) {
                         msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
                     }
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
index a49ff23..ad321e8 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
@@ -79,7 +79,7 @@ struct pubsub_tcp_topic_receiver {
     long subscriberTrackerId;
     struct {
         celix_thread_mutex_t mutex;
-        hash_map_t *map; //key = bnd id, value = psa_tcp_subscriber_entry_t
+        hash_map_t *map; //key = long svc id, value = psa_tcp_subscriber_entry_t
         bool allInitialized;
     } subscribers;
 };
@@ -92,12 +92,12 @@ typedef struct psa_tcp_requested_connection_entry {
 } psa_tcp_requested_connection_entry_t;
 
 typedef struct psa_tcp_subscriber_entry {
-    hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
+    pubsub_subscriber_t* subscriberSvc;
     bool initialized; //true if the init function is called through the receive thread
 } psa_tcp_subscriber_entry_t;
 
-static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
-static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
+static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props);
+static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props);
 static void *psa_tcp_recvThread(void *data);
 static void psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t *receiver);
 static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiver);
@@ -229,8 +229,8 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
         opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
         opts.filter.filter = buf;
         opts.callbackHandle = receiver;
-        opts.addWithOwner = pubsub_tcpTopicReceiver_addSubscriber;
-        opts.removeWithOwner = pubsub_tcpTopicReceiver_removeSubscriber;
+        opts.addWithProperties = pubsub_tcpTopicReceiver_addSubscriber;
+        opts.removeWithProperties = pubsub_tcpTopicReceiver_removeSubscriber;
         receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
     }
 
@@ -259,19 +259,11 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
         celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId);
 
         celixThreadMutex_lock(&receiver->subscribers.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
-            if (entry != NULL) {
-                hashMap_destroy(entry->subscriberServices, false, false);
-                free(entry);
-            }
-        }
-        hashMap_destroy(receiver->subscribers.map, false, false);
+        hashMap_destroy(receiver->subscribers.map, false, true);
         celixThreadMutex_unlock(&receiver->subscribers.mutex);
 
         celixThreadMutex_lock(&receiver->requestedConnections.mutex);
-        iter = hashMapIterator_construct(receiver->requestedConnections.map);
+        hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
         while (hashMapIterator_hasNext(&iter)) {
             psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (entry != NULL) {
@@ -394,11 +386,9 @@ void pubsub_tcpTopicReceiver_disconnectFrom(pubsub_tcp_topic_receiver_t *receive
     celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
 }
 
-static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props,
-                                                  const celix_bundle_t *bnd) {
+static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props) {
     pubsub_tcp_topic_receiver_t *receiver = handle;
 
-    long bndId = celix_bundle_getId(bnd);
     long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
     if (receiver->scope == NULL) {
@@ -415,62 +405,78 @@ static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const
         return;
     }
 
-    celixThreadMutex_lock(&receiver->subscribers.mutex);
-    psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *)bndId);
-    if (entry != NULL) {
-        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
-    } else {
-        //new create entry
-        entry = calloc(1, sizeof(*entry));
-        entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
-        entry->initialized = false;
-        receiver->subscribers.allInitialized = false;
+    psa_tcp_subscriber_entry_t *entry = calloc(1, sizeof(*entry));
+    entry->subscriberSvc = svc;
+    entry->initialized = false;
 
-        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
-        hashMap_put(receiver->subscribers.map, (void *)bndId, entry);
-    }
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    hashMap_put(receiver->subscribers.map, (void*)svcId, entry);
+    receiver->subscribers.allInitialized = false;
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
 
-static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props,
-                                                     const celix_bundle_t *bnd) {
+static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props) {
     pubsub_tcp_topic_receiver_t *receiver = handle;
 
-    long bndId = celix_bundle_getId(bnd);
     long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
 
     celixThreadMutex_lock(&receiver->subscribers.mutex);
-    psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId);
-    if (entry != NULL) {
-        hashMap_remove(entry->subscriberServices, (void*)svcId);
-    }
-    if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
-        //remove entry
-        hashMap_remove(receiver->subscribers.map, (void *) bndId);
-        hashMap_destroy(entry->subscriberServices, false, false);
-        free(entry);
+    psa_tcp_subscriber_entry_t *entry = hashMap_remove(receiver->subscribers.map, (void *)svcId);
+    free(entry);
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
+
+static void callReceivers(pubsub_tcp_topic_receiver_t *receiver, const char* msgFqn, const pubsub_protocol_message_t *message, void** msg, bool* release, const celix_properties_t* metadata) {
+    *release = true;
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_tcp_subscriber_entry_t* entry = hashMapIterator_nextValue(&iter);
+        if (entry != NULL && entry->subscriberSvc->receive != NULL) {
+            entry->subscriberSvc->receive(entry->subscriberSvc->handle, msgFqn, message->header.msgId, *msg, metadata, release);
+            if (!(*release)) {
+                //receive function has taken ownership, deserialize again for new message
+                struct iovec deSerializeBuffer;
+                deSerializeBuffer.iov_base = message->payload.payload;
+                deSerializeBuffer.iov_len = message->payload.length;
+                celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler,
+                                                                             message->header.msgId,
+                                                                             message->header.msgMajorVersion,
+                                                                             message->header.msgMinorVersion,
+                                                                             &deSerializeBuffer, 0, msg);
+                if (status != CELIX_SUCCESS) {
+                    L_WARN("[PSA_TCO_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
+                           receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+                    break;
+                }
+            }
+            *release = true;
+        }
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
 
-static inline void
-processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subscriber_entry_t *entry,
-                             const pubsub_protocol_message_t *message, bool *releaseMsg, struct timespec *receiveTime __attribute__((unused))) {
-    //NOTE receiver->subscribers.mutex locked
+static inline void processMsg(void* handle, const pubsub_protocol_message_t *message, bool* releaseMsg, struct timespec *receiveTime) {
+    pubsub_tcp_topic_receiver_t *receiver = handle;
 
-    const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
+    const char *msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
     if (msgFqn == NULL) {
         L_WARN("Cannot find msg fqn for msg id %u", message->header.msgId);
         return;
     }
-
     void *deSerializedMsg = NULL;
-    bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion);
+    bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId,
+                                                                    message->header.msgMajorVersion,
+                                                                    message->header.msgMinorVersion);
     if (validVersion) {
         struct iovec deSerializeBuffer;
         deSerializeBuffer.iov_base = message->payload.payload;
         deSerializeBuffer.iov_len = message->payload.length;
-        celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 1, &deSerializedMsg);
+        celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId,
+                                                                     message->header.msgMajorVersion,
+                                                                     message->header.msgMinorVersion,
+                                                                     &deSerializeBuffer, 0, &deSerializedMsg);
+
         // When received payload pointer is the same as deserializedMsg, set ownership of pointer to topic receiver
         if (message->payload.payload == deSerializedMsg) {
             *releaseMsg = true;
@@ -479,33 +485,15 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
         if (status == CELIX_SUCCESS) {
             uint32_t msgId = message->header.msgId;
             celix_properties_t *metadata = message->metadata.metadata;
-            bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, &metadata);
-            bool release = true;
+            bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId,
+                                                                  deSerializedMsg, &metadata);
             if (cont) {
-                hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
-                while (hashMapIterator_hasNext(&iter)) {
-                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
-                    svc->receive(svc->handle, msgFqn, msgId, deSerializedMsg, message->metadata.metadata, &release);
-                    pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, metadata);
-                    if (!release && hashMapIterator_hasNext(&iter)) {
-                        //receive function has taken ownership and still more receive function to come ..
-                        //deserialize again for new message
-                        status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 1, &deSerializedMsg);
-                        if (status != CELIX_SUCCESS) {
-                            L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s",
-                                   msgFqn,
-                                   receiver->scope == NULL ? "(null)" : receiver->scope,
-                                   receiver->topic);
-                            break;
-                        }
-                        release = true;
-                    }
-                }
+                bool release;
+                callReceivers(receiver, msgFqn, message, &deSerializedMsg, &release, metadata);
+                pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, metadata);
                 if (release) {
-                    pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deSerializedMsg);
-                }
-                if (message->metadata.metadata) {
-                    celix_properties_destroy(message->metadata.metadata);
+                    pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId,
+                                                                 deSerializedMsg);
                 }
             }
         } else {
@@ -516,27 +504,13 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
         L_WARN("[PSA_TCP_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version local: %i.%i.x",
                msgFqn,
                pubsub_serializerHandler_getSerializationType(receiver->serializerHandler),
-               (int)message->header.msgMajorVersion,
-               (int)message->header.msgMinorVersion,
+               (int) message->header.msgMajorVersion,
+               (int) message->header.msgMinorVersion,
                pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, message->header.msgId),
                pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, message->header.msgId));
     }
 }
 
-static void
-processMsg(void *handle, const pubsub_protocol_message_t *message, bool *release, struct timespec *receiveTime) {
-    pubsub_tcp_topic_receiver_t *receiver = handle;
-    celixThreadMutex_lock(&receiver->subscribers.mutex);
-    hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
-        if (entry != NULL) {
-            processMsgForSubscriberEntry(receiver, entry, message, release, receiveTime);
-        }
-    }
-    celixThreadMutex_unlock(&receiver->subscribers.mutex);
-}
-
 static void *psa_tcp_recvThread(void *data) {
     pubsub_tcp_topic_receiver_t *receiver = data;
 
@@ -642,20 +616,15 @@ static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiv
         while (hashMapIterator_hasNext(&iter)) {
             psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (!entry->initialized) {
-                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
-                while (hashMapIterator_hasNext(&iter2)) {
-                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
-                    int rc = 0;
-                    if (svc != NULL && svc->init != NULL) {
-                        rc = svc->init(svc->handle);
-                    }
-                    if (rc == 0) {
-                        //note now only initialized on first subscriber entries added.
-                        entry->initialized = true;
-                    } else {
-                        L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
-                        allInitialized = false;
-                    }
+                int rc = 0;
+                if (entry->subscriberSvc->init != NULL) {
+                    rc = entry->subscriberSvc->init(entry->subscriberSvc->handle);
+                }
+                if (rc == 0) {
+                    entry->initialized = true;
+                } else {
+                    L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+                    allInitialized = false;
                 }
             }
         }
diff --git a/bundles/pubsub/pubsub_admin_websocket/v1/CMakeLists.txt b/bundles/pubsub/pubsub_admin_websocket/v1/CMakeLists.txt
index f6397e0..021310f 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v1/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_websocket/v1/CMakeLists.txt
@@ -15,11 +15,13 @@
 # specific language governing permissions and limitations
 # under the License.
 
+message(WARNING "PubSub Websocket Admin V1 is deprecated, use PubSub ZMQ Websocket v2 instead")
+
 find_package(Jansson REQUIRED)
 find_package(UUID REQUIRED)
 
-add_celix_bundle(celix_pubsub_admin_websocket
-    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_websocket"
+add_celix_bundle(celix_pubsub_admin_websocket_v1
+    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_websocket_v1"
     VERSION "1.0.0"
     GROUP "Celix/PubSub"
     SOURCES
@@ -30,16 +32,16 @@ add_celix_bundle(celix_pubsub_admin_websocket
         src/pubsub_websocket_common.c
 )
 
-set_target_properties(celix_pubsub_admin_websocket PROPERTIES INSTALL_RPATH "$ORIGIN")
-target_link_libraries(celix_pubsub_admin_websocket PRIVATE
+set_target_properties(celix_pubsub_admin_websocket_v1 PROPERTIES INSTALL_RPATH "$ORIGIN")
+target_link_libraries(celix_pubsub_admin_websocket_v1 PRIVATE
         Celix::framework Celix::dfi Celix::log_helper Celix::utils
         Celix::http_admin_api
 )
-target_link_libraries(celix_pubsub_admin_websocket PRIVATE Celix::pubsub_spi Celix::pubsub_utils )
-target_include_directories(celix_pubsub_admin_websocket PRIVATE
+target_link_libraries(celix_pubsub_admin_websocket_v1 PRIVATE Celix::pubsub_spi Celix::pubsub_utils )
+target_include_directories(celix_pubsub_admin_websocket_v1 PRIVATE
     src
 )
 
-install_celix_bundle(celix_pubsub_admin_websocket EXPORT celix COMPONENT pubsub)
-target_link_libraries(celix_pubsub_admin_websocket PRIVATE Celix::shell_api)
-add_library(Celix::pubsub_admin_websocket ALIAS celix_pubsub_admin_websocket)
+install_celix_bundle(celix_pubsub_admin_websocket_v1 EXPORT celix COMPONENT pubsub)
+target_link_libraries(celix_pubsub_admin_websocket_v1 PRIVATE Celix::shell_api)
+add_library(Celix::pubsub_admin_websocket_v1 ALIAS celix_pubsub_admin_websocket_v1)
diff --git a/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_receiver.c b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_receiver.c
index 7d8cd00..7b2cfcc 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_receiver.c
@@ -500,9 +500,8 @@ static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_
                 while (hashMapIterator_hasNext(&iter)) {
                     pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
                     svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, NULL, &release);
-                    if (!release && hashMapIterator_hasNext(&iter)) {
-                        //receive function has taken ownership and still more receive function to come ..
-                        //deserialize again for new message
+                    if (!release) {
+                        //receive function has taken ownership, deserialize again for new message
                         status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &deSerializedMsg);
                         if (status != CELIX_SUCCESS) {
                             L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt b/bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt
index 52723b9..bfd8eef 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt
@@ -15,6 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
+message(WARNING "PubSub ZMQ Admin V1 is deprecated, use PubSub ZMQ Admin v2 instead")
+
 find_package(ZMQ REQUIRED)
 find_package(CZMQ REQUIRED)
 find_package(Jansson REQUIRED)
@@ -31,8 +33,8 @@ if (BUILD_ZMQ_SECURITY)
     set (ZMQ_CRYPTO_C "src/zmq_crypto.c")
 endif()
 
-add_celix_bundle(celix_pubsub_admin_zmq
-    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_zmq"
+add_celix_bundle(celix_pubsub_admin_zmq_v1
+    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_zmq_v1"
     VERSION "1.1.0"
     GROUP "Celix/PubSub"
     SOURCES
@@ -43,17 +45,17 @@ add_celix_bundle(celix_pubsub_admin_zmq
         ${ZMQ_CRYPTO_C}
 )
 
-set_target_properties(celix_pubsub_admin_zmq PROPERTIES INSTALL_RPATH "$ORIGIN")
-target_link_libraries(celix_pubsub_admin_zmq PRIVATE
+set_target_properties(celix_pubsub_admin_zmq_v1 PROPERTIES INSTALL_RPATH "$ORIGIN")
+target_link_libraries(celix_pubsub_admin_zmq_v1 PRIVATE
 
         Celix::framework Celix::dfi Celix::log_helper Celix::utils
         ZMQ::lib CZMQ::lib ${OPTIONAL_OPENSSL_LIB}
 )
-target_link_libraries(celix_pubsub_admin_zmq PRIVATE Celix::pubsub_spi Celix::pubsub_utils )
-target_include_directories(celix_pubsub_admin_zmq PRIVATE
+target_link_libraries(celix_pubsub_admin_zmq_v1 PRIVATE Celix::pubsub_spi Celix::pubsub_utils )
+target_include_directories(celix_pubsub_admin_zmq_v1 PRIVATE
     src
 )
 
-install_celix_bundle(celix_pubsub_admin_zmq EXPORT celix COMPONENT pubsub)
-target_link_libraries(celix_pubsub_admin_zmq PRIVATE Celix::shell_api)
-add_library(Celix::pubsub_admin_zmq ALIAS celix_pubsub_admin_zmq)
+install_celix_bundle(celix_pubsub_admin_zmq_v1 EXPORT celix COMPONENT pubsub)
+target_link_libraries(celix_pubsub_admin_zmq_v1 PRIVATE Celix::shell_api)
+add_library(Celix::celix_pubsub_admin_zmq_v1 ALIAS celix_pubsub_admin_zmq_v1)
diff --git a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c
index 11f2b8f..9cd99f4 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c
@@ -533,10 +533,8 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
                     while (hashMapIterator_hasNext(&iter2)) {
                         pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
                         svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, metadata, &release);
-                        pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata);
-                        if (!release && hashMapIterator_hasNext(&iter2)) {
-                            //receive function has taken ownership and still more receive function to come ..
-                            //deserialize again for new message
+                        if (!release) {
+                            //receive function has taken ownership deserialize again for new message
                             status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &deserializedMsg);
                             if (status != CELIX_SUCCESS) {
                                 L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
@@ -544,6 +542,7 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
                             }
                             release = true;
                         }
+                        pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata);
                     }
                     if (release) {
                         msgSer->freeDeserializeMsg(msgSer->handle, deserializedMsg);
@@ -558,58 +557,6 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
     } else {
         L_WARN("[PSA_ZMQ_TR] Cannot find serializer for type id 0x%X", message->header.msgId);
     }
-
-    if (msgSer != NULL && monitor) {
-        // TODO disabled for now, should move to an interceptor?
-//        hash_map_t *origins = hashMap_get(entry->metrics, (void*)(uintptr_t )message->header.msgId);
-//        char uuidStr[UUID_STR_LEN+1];
-//        uuid_unparse(hdr->originUUID, uuidStr);
-//        psa_zmq_subscriber_metrics_entry_t *metrics = hashMap_get(origins, uuidStr);
-//
-//        if (metrics == NULL) {
-//            metrics = calloc(1, sizeof(*metrics));
-//            hashMap_put(origins, strndup(uuidStr, UUID_STR_LEN+1), metrics);
-//            uuid_copy(metrics->origin, hdr->originUUID);
-//            metrics->msgTypeId = hdr->type;
-//            metrics->maxDelayInSeconds = -INFINITY;
-//            metrics->minDelayInSeconds = INFINITY;
-//            metrics->lastSeqNr = 0;
-//        }
-//
-//        double diff = celix_difftime(&beginSer, &endSer);
-//        long n = metrics->nrOfMessagesReceived;
-//        metrics->averageSerializationTimeInSeconds = (metrics->averageSerializationTimeInSeconds * n + diff) / (n+1);
-//
-//        diff = celix_difftime(&metrics->lastMessageReceived, receiveTime);
-//        n = metrics->nrOfMessagesReceived;
-//        if (metrics->nrOfMessagesReceived >= 1) {
-//            metrics->averageTimeBetweenMessagesInSeconds = (metrics->averageTimeBetweenMessagesInSeconds * n + diff) / (n + 1);
-//        }
-//        metrics->lastMessageReceived = *receiveTime;
-//
-//
-//        int incr = hdr->seqNr - metrics->lastSeqNr;
-//        if (metrics->lastSeqNr >0 && incr > 1) {
-//            metrics->nrOfMissingSeqNumbers += (incr - 1);
-//            L_WARN("Missing message seq nr went from %i to %i", metrics->lastSeqNr, hdr->seqNr);
-//        }
-//        metrics->lastSeqNr = hdr->seqNr;
-//
-//        struct timespec sendTime;
-//        sendTime.tv_sec = (time_t)hdr->sendtimeSeconds;
-//        sendTime.tv_nsec = (long)hdr->sendTimeNanoseconds; //TODO FIXME the tv_nsec is not correct
-//        diff = celix_difftime(&sendTime, receiveTime);
-//        metrics->averageDelayInSeconds = (metrics->averageDelayInSeconds * n + diff) / (n+1);
-//        if (diff < metrics->minDelayInSeconds) {
-//            metrics->minDelayInSeconds = diff;
-//        }
-//        if (diff > metrics->maxDelayInSeconds) {
-//            metrics->maxDelayInSeconds = diff;
-//        }
-//
-//        metrics->nrOfMessagesReceived += updateReceiveCount;
-//        metrics->nrOfSerializationErrors += updateSerError;
-    }
 }
 
 static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
index 90e9510..0fc2bc2 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
@@ -93,7 +93,7 @@ struct pubsub_zmq_topic_receiver {
     long subscriberTrackerId;
     struct {
         celix_thread_mutex_t mutex;
-        hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
+        hash_map_t *map; //key = long svc id, value = psa_zmq_subscriber_entry_t
         bool allInitialized;
     } subscribers;
 };
@@ -105,13 +105,13 @@ typedef struct psa_zmq_requested_connection_entry {
 } psa_zmq_requested_connection_entry_t;
 
 typedef struct psa_zmq_subscriber_entry {
-    hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
+    pubsub_subscriber_t* subscriberSvc;
     bool initialized; //true if the init function is called through the receive thread
 } psa_zmq_subscriber_entry_t;
 
 
-static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
-static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
+static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props);
+static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props);
 static void* psa_zmq_recvThread(void * data);
 static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t *receiver);
 static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiver);
@@ -237,8 +237,8 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
         opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
         opts.filter.filter = buf;
         opts.callbackHandle = receiver;
-        opts.addWithOwner = pubsub_zmqTopicReceiver_addSubscriber;
-        opts.removeWithOwner = pubsub_zmqTopicReceiver_removeSubscriber;
+        opts.addWithProperties = pubsub_zmqTopicReceiver_addSubscriber;
+        opts.removeWithProperties = pubsub_zmqTopicReceiver_removeSubscriber;
 
         receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
     }
@@ -275,19 +275,11 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) {
         celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId);
 
         celixThreadMutex_lock(&receiver->subscribers.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
-            if (entry != NULL)  {
-                hashMap_destroy(entry->subscriberServices, false, false);
-                free(entry);
-            }
-        }
-        hashMap_destroy(receiver->subscribers.map, false, false);
+        hashMap_destroy(receiver->subscribers.map, false, true);
         celixThreadMutex_unlock(&receiver->subscribers.mutex);
 
         celixThreadMutex_lock(&receiver->requestedConnections.mutex);
-        iter = hashMapIterator_construct(receiver->requestedConnections.map);
+        hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
         while (hashMapIterator_hasNext(&iter)) {
             psa_zmq_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (entry != NULL) {
@@ -384,10 +376,9 @@ void pubsub_zmqTopicReceiver_disconnectFrom(pubsub_zmq_topic_receiver_t *receive
     celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
 }
 
-static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
+static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props) {
     pubsub_zmq_topic_receiver_t *receiver = handle;
 
-    long bndId = celix_bundle_getId(bnd);
     long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
     if (receiver->scope == NULL) {
@@ -404,107 +395,104 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
         return;
     }
 
+    psa_zmq_subscriber_entry_t *entry = calloc(1, sizeof(*entry));
+    entry->subscriberSvc = svc;
+    entry->initialized = false;
+
     celixThreadMutex_lock(&receiver->subscribers.mutex);
-    psa_zmq_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
-    if (entry != NULL) {
-        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
-    } else {
-        //new create entry
-        entry = calloc(1, sizeof(*entry));
-        entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
-        entry->initialized = false;
-        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
-        hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
-    }
+    hashMap_put(receiver->subscribers.map, (void*)svcId, entry);
+    receiver->subscribers.allInitialized = false;
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
 
-static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
+static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props) {
     pubsub_zmq_topic_receiver_t *receiver = handle;
 
-    long bndId = celix_bundle_getId(bnd);
     long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
 
     celixThreadMutex_lock(&receiver->subscribers.mutex);
-    psa_zmq_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
-    if (entry != NULL) {
-        hashMap_remove(entry->subscriberServices, (void*)svcId);
-    }
-    if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
-        //remove entry
-        hashMap_remove(receiver->subscribers.map, (void*)bndId);
-        hashMap_destroy(entry->subscriberServices, false, false);
-        free(entry);
+    psa_zmq_subscriber_entry_t *entry = hashMap_remove(receiver->subscribers.map, (void*)svcId);
+    free(entry);
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
+
+static void callReceivers(pubsub_zmq_topic_receiver_t *receiver, const char* msgFqn, const pubsub_protocol_message_t *message, void** msg, bool* release, const celix_properties_t* metadata) {
+    *release = true;
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_zmq_subscriber_entry_t* entry = hashMapIterator_nextValue(&iter);
+        if (entry != NULL && entry->subscriberSvc->receive != NULL) {
+            entry->subscriberSvc->receive(entry->subscriberSvc->handle, msgFqn, message->header.msgId, *msg, metadata, release);
+            if (!(*release)) {
+                //receive function has taken ownership, deserialize again for new message
+                struct iovec deSerializeBuffer;
+                deSerializeBuffer.iov_base = message->payload.payload;
+                deSerializeBuffer.iov_len = message->payload.length;
+                celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler,
+                                                             message->header.msgId,
+                                                             message->header.msgMajorVersion,
+                                                             message->header.msgMinorVersion,
+                                                              &deSerializeBuffer, 0, msg);
+                if (status != CELIX_SUCCESS) {
+                    L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
+                           receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+                    break;
+                }
+            }
+            *release = true;
+        }
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
 
-static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *receiver, psa_zmq_subscriber_entry_t* entry, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
-    const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
+static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
+    const char *msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
     if (msgFqn == NULL) {
         L_WARN("Cannot find msg fqn for msg id %u", message->header.msgId);
         return;
     }
-
     void *deserializedMsg = NULL;
-    bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion);
+    bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId,
+                                                                    message->header.msgMajorVersion,
+                                                                    message->header.msgMinorVersion);
     if (validVersion) {
         struct iovec deSerializeBuffer;
         deSerializeBuffer.iov_base = message->payload.payload;
-        deSerializeBuffer.iov_len  = message->payload.length;
-        celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 0, &deserializedMsg);
+        deSerializeBuffer.iov_len = message->payload.length;
+        celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId,
+                                                                     message->header.msgMajorVersion,
+                                                                     message->header.msgMinorVersion,
+                                                                     &deSerializeBuffer, 0, &deserializedMsg);
         if (status == CELIX_SUCCESS) {
             uint32_t msgId = message->header.msgId;
             celix_properties_t *metadata = message->metadata.metadata;
-            bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, &metadata);
-            bool release = true;
+            bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId,
+                                                                  deserializedMsg, &metadata);
             if (cont) {
-                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
-                while (hashMapIterator_hasNext(&iter2)) {
-                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
-                    svc->receive(svc->handle, msgFqn, message->header.msgId, deserializedMsg, metadata, &release);
-                    pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, metadata);
-                    if (!release && hashMapIterator_hasNext(&iter2)) {
-                        //receive function has taken ownership and still more receive function to come ..
-                        //deserialize again for new message
-                        status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 0, &deserializedMsg);
-                        if (status != CELIX_SUCCESS) {
-                            L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
-                            break;
-                        }
-                        release = true;
-                    }
-                }
+                bool release;
+                callReceivers(receiver, msgFqn, message, &deserializedMsg, &release, metadata);
+                pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, metadata);
                 if (release) {
-                    pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deserializedMsg);
+                    pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId,
+                                                                 deserializedMsg);
                 }
             }
         } else {
-            L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+            L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
+                   receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
         }
     } else {
         L_WARN("[PSA_ZMQ_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version local: %i.%i.x",
                msgFqn,
                pubsub_serializerHandler_getSerializationType(receiver->serializerHandler),
-               (int)message->header.msgMajorVersion,
-               (int)message->header.msgMinorVersion,
+               (int) message->header.msgMajorVersion,
+               (int) message->header.msgMinorVersion,
                pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, message->header.msgId),
                pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, message->header.msgId));
     }
 }
 
-static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
-    celixThreadMutex_lock(&receiver->subscribers.mutex);
-    hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
-        if (entry != NULL) {
-            processMsgForSubscriberEntry(receiver, entry, message, receiveTime);
-        }
-    }
-    celixThreadMutex_unlock(&receiver->subscribers.mutex);
-}
-
 static void* psa_zmq_recvThread(void * data) {
     pubsub_zmq_topic_receiver_t *receiver = data;
 
@@ -627,20 +615,16 @@ static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiv
         while (hashMapIterator_hasNext(&iter)) {
             psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (!entry->initialized) {
-                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
-                while (hashMapIterator_hasNext(&iter2)) {
-                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
-                    int rc = 0;
-                    if (svc != NULL && svc->init != NULL) {
-                        rc = svc->init(svc->handle);
-                    }
-                    if (rc == 0) {
-                        //note now only initialized on first subscriber entries added.
-                        entry->initialized = true;
-                    } else {
-                        L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
-                        allInitialized = false;
-                    }
+                int rc = 0;
+                if (entry->subscriberSvc != NULL && entry->subscriberSvc->init != NULL) {
+                    rc = entry->subscriberSvc->init(entry->subscriberSvc->handle);
+                }
+                if (rc == 0) {
+                    //note now only initialized on first subscriber entries added.
+                    entry->initialized = true;
+                } else {
+                    L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+                    allInitialized = false;
                 }
             }
         }
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
index 801fd35..6595853 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
@@ -37,7 +37,7 @@ celix_status_t pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *
 
 bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t **metadata);
 void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata);
-bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata);
+bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t **metadata);
 void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata);
 
 #ifdef __cplusplus
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
index 331a41d..6118fbe 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
@@ -18,7 +18,8 @@
  */
 #include "celix_bundle_context.h"
 #include "celix_constants.h"
-#include "utils.h"
+#include "celix_array_list.h"
+#include "celix_utils.h"
 
 #include "pubsub_interceptors_handler.h"
 
@@ -91,8 +92,8 @@ void pubsubInterceptorsHandler_addInterceptor(void *handle, void *svc, const cel
     celixThreadMutex_lock(&handler->lock);
 
     bool exists = false;
-    for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) {
-        entry_t *entry = arrayList_get(handler->interceptors, i);
+    for (uint32_t i = 0; i < celix_arrayList_size(handler->interceptors); i++) {
+        entry_t *entry = celix_arrayList_get(handler->interceptors, i);
         if (entry->interceptor == svc) {
             exists = true;
         }
@@ -114,11 +115,11 @@ void pubsubInterceptorsHandler_removeInterceptor(void *handle, void *svc, __attr
 
     celixThreadMutex_lock(&handler->lock);
 
-    for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) {
-        entry_t *entry = arrayList_get(handler->interceptors, i);
+    for (uint32_t i = 0; i < celix_arrayList_size(handler->interceptors); i++) {
+        entry_t *entry = celix_arrayList_get(handler->interceptors, i);
         if (entry->interceptor == svc) {
-            void *old = arrayList_remove(handler->interceptors, i);
-            free(old);
+            celix_arrayList_removeAt(handler->interceptors, i);
+            free(entry);
             break;
         }
     }
@@ -131,12 +132,12 @@ bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handl
 
     celixThreadMutex_lock(&handler->lock);
 
-    if (*metadata == NULL && arrayList_size(handler->interceptors) > 0) {
+    if (*metadata == NULL && celix_arrayList_size(handler->interceptors) > 0) {
         *metadata = celix_properties_create();
     }
 
-    for (uint32_t i = arrayList_size(handler->interceptors); i > 0; i--) {
-        entry_t *entry = arrayList_get(handler->interceptors, i - 1);
+    for (uint32_t i = celix_arrayList_size(handler->interceptors); i > 0; i--) {
+        entry_t *entry = celix_arrayList_get(handler->interceptors, i - 1);
         if (entry->interceptor->preSend != NULL) {
             cont = entry->interceptor->preSend(entry->interceptor->handle, &handler->properties, messageType, messageId, message, *metadata);
         }
@@ -153,8 +154,8 @@ bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handl
 void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata) {
     celixThreadMutex_lock(&handler->lock);
 
-    for (uint32_t i = arrayList_size(handler->interceptors); i > 0; i--) {
-        entry_t *entry = arrayList_get(handler->interceptors, i - 1);
+    for (uint32_t i = celix_arrayList_size(handler->interceptors); i > 0; i--) {
+        entry_t *entry = celix_arrayList_get(handler->interceptors, i - 1);
         if (entry->interceptor->postSend != NULL) {
             entry->interceptor->postSend(entry->interceptor->handle, &handler->properties, messageType, messageId, message, metadata);
         }
@@ -163,15 +164,17 @@ void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *hand
     celixThreadMutex_unlock(&handler->lock);
 }
 
-bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata) {
+bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t **metadata) {
     bool cont = true;
 
     celixThreadMutex_lock(&handler->lock);
-
-    for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) {
-        entry_t *entry = arrayList_get(handler->interceptors, i);
+    if (*metadata == NULL && celix_arrayList_size(handler->interceptors) > 0) {
+        *metadata = celix_properties_create();
+    }
+    for (uint32_t i = 0; i < celix_arrayList_size(handler->interceptors); i++) {
+        entry_t *entry = celix_arrayList_get(handler->interceptors, i);
         if (entry->interceptor->preReceive != NULL) {
-            cont = entry->interceptor->preReceive(entry->interceptor->handle, &handler->properties, messageType, messageId, message, metadata);
+            cont = entry->interceptor->preReceive(entry->interceptor->handle, &handler->properties, messageType, messageId, message, *metadata);
         }
         if (!cont) {
             break;
@@ -186,8 +189,8 @@ bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *ha
 void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata) {
     celixThreadMutex_lock(&handler->lock);
 
-    for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) {
-        entry_t *entry = arrayList_get(handler->interceptors, i);
+    for (uint32_t i = 0; i < celix_arrayList_size(handler->interceptors); i++) {
+        entry_t *entry = celix_arrayList_get(handler->interceptors, i);
         if (entry->interceptor->postReceive != NULL) {
             entry->interceptor->postReceive(entry->interceptor->handle, &handler->properties, messageType, messageId, message, metadata);
         }