You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2021/07/26 18:55:00 UTC

[celix] branch feature/pubsub-interceptor-fix updated (6229f9e -> f0cf244)

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a change to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git.


    from 6229f9e  Updates the interceptor api so that metadata can be extended in the preSend/Receive callbacks.
     new 6bf1ef1  Adds interceptor support to the pubsub websocket
     new 02a9627  Updates cmake configuration and fixes invalid pubsub cmake aliases.
     new 3f0014b  Removes interceptor support from psa zmq/tcp v1 (the behavior is different from v2)
     new 0b04fde  Fixes memleak in pubsub topology manager
     new f0cf244  Adds some additional pubsub interceptor integration tests

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 bundles/cxx_remote_services/CMakeLists.txt         |   3 +-
 .../discovery_configured/CMakeLists.txt            |   1 +
 .../cxx_remote_services/integration/CMakeLists.txt |  20 +-
 .../integration/gtest/CMakeLists.txt               |  16 +-
 bundles/http_admin/http_admin/CMakeLists.txt       |   1 +
 bundles/logging/log_admin/CMakeLists.txt           |   1 +
 bundles/logging/log_service_v2/CMakeLists.txt      |   2 +
 .../log_writers/syslog_writer/CMakeLists.txt       |   1 +
 bundles/pubsub/CMakeLists.txt                      |  12 +-
 bundles/pubsub/examples/CMakeLists.txt             | 156 ++++++------
 bundles/pubsub/integration/CMakeLists.txt          | 262 ++++++++++++---------
 .../gtest/PubSubInterceptorTestSuite.cc            | 245 ++++++++++++++-----
 bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt  |   4 +-
 .../v1/src/pubsub_tcp_topic_receiver.c             |  54 ++---
 .../v1/src/pubsub_tcp_topic_sender.c               |  87 +++----
 bundles/pubsub/pubsub_admin_tcp/v2/CMakeLists.txt  |   2 +-
 .../v2/src/pubsub_tcp_topic_receiver.c             |  21 +-
 .../v2/src/pubsub_tcp_topic_sender.c               |   1 +
 bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt  |   2 +-
 .../pubsub_admin_websocket/v1/CMakeLists.txt       |   4 +-
 .../pubsub_admin_websocket/v2/CMakeLists.txt       |   2 +-
 .../v2/src/pubsub_websocket_common.h               |   2 +-
 .../v2/src/pubsub_websocket_topic_receiver.c       | 210 +++++++++--------
 .../v2/src/pubsub_websocket_topic_sender.c         |  13 +
 bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt  |   2 +-
 .../v1/src/pubsub_zmq_topic_receiver.c             |  45 ++--
 .../v1/src/pubsub_zmq_topic_sender.c               | 232 +++++++++---------
 bundles/pubsub/pubsub_admin_zmq/v2/CMakeLists.txt  |   2 +-
 .../v2/src/pubsub_zmq_topic_receiver.c             |  22 +-
 .../v2/src/pubsub_zmq_topic_sender.c               |   7 +-
 bundles/pubsub/pubsub_discovery/CMakeLists.txt     |   2 +-
 .../pubsub_protocol_wire_v1/CMakeLists.txt         |   2 +-
 .../pubsub_protocol_wire_v2/CMakeLists.txt         |   2 +-
 .../pubsub_serializer_avrobin/CMakeLists.txt       |   2 +-
 .../pubsub/pubsub_serializer_json/CMakeLists.txt   |   2 +-
 .../pubsub/pubsub_topology_manager/CMakeLists.txt  |   2 +-
 .../src/pubsub_topology_manager.c                  |   9 +-
 bundles/shell/remote_shell/CMakeLists.txt          |   1 +
 bundles/shell/shell/CMakeLists.txt                 |   2 +
 bundles/shell/shell_bonjour/CMakeLists.txt         |   1 +
 bundles/shell/shell_tui/CMakeLists.txt             |   1 +
 bundles/shell/shell_wui/CMakeLists.txt             |   1 +
 cmake/AddGTest.cmake                               |   2 +-
 cmake/CelixConfig.cmake                            |  11 +
 cmake/cmake_celix/BundlePackaging.cmake            | 109 ++++-----
 cmake/cmake_celix/Generic.cmake                    |  13 +-
 documents/cmake_commands/README.md                 |  28 +++
 47 files changed, 896 insertions(+), 726 deletions(-)

[celix] 01/05: Adds interceptor support to the pubsub websocket

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 6bf1ef1e7e71b4fea6589f4639eb3b3b30e09de3
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Mon Jul 26 15:35:37 2021 +0200

    Adds interceptor support to the pubsub websocket
---
 .../v2/src/pubsub_tcp_topic_receiver.c             |  11 +-
 .../v2/src/pubsub_websocket_common.h               |   2 +-
 .../v2/src/pubsub_websocket_topic_receiver.c       | 211 ++++++++++-----------
 .../v2/src/pubsub_websocket_topic_sender.c         |  11 ++
 .../v2/src/pubsub_zmq_topic_receiver.c             |  11 +-
 .../v2/src/pubsub_zmq_topic_sender.c               |   2 +-
 6 files changed, 126 insertions(+), 122 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
index 36816f1..e9ef6b4 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
@@ -446,7 +446,7 @@ static void callReceivers(pubsub_tcp_topic_receiver_t *receiver, const char* msg
                                                                              message->header.msgMinorVersion,
                                                                              &deSerializeBuffer, 0, msg);
                 if (status != CELIX_SUCCESS) {
-                    L_WARN("[PSA_TCO_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
+                    L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
                            receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
                     break;
                 }
@@ -484,17 +484,14 @@ static inline void processMsg(void* handle, const pubsub_protocol_message_t *mes
         }
 
         if (status == CELIX_SUCCESS) {
-            uint32_t msgId = message->header.msgId;
             celix_properties_t *metadata = message->metadata.metadata;
-            bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId,
-                                                                  deSerializedMsg, &metadata);
+            bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, message->header.msgId, deSerializedMsg, &metadata);
             if (cont) {
                 bool release;
                 callReceivers(receiver, msgFqn, message, &deSerializedMsg, &release, metadata);
-                pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, metadata);
+                pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, message->header.msgId, deSerializedMsg, metadata);
                 if (release) {
-                    pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId,
-                                                                 deSerializedMsg);
+                    pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deSerializedMsg);
                 }
             }
         } else {
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_common.h b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_common.h
index 4c22319..8a764d1 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_common.h
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_common.h
@@ -27,7 +27,7 @@
 
 
 struct pubsub_websocket_msg_header {
-    const char *id; //FQN
+    const char *fqn;
     uint8_t major;
     uint8_t minor;
     uint32_t seqNr;
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
index ea997e9..56f4008 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
@@ -37,12 +37,14 @@
 #include <jansson.h>
 #include <pubsub_utils.h>
 #include <celix_api.h>
+#include "pubsub_interceptors_handler.h"
 
 #ifndef UUID_STR_LEN
 #define UUID_STR_LEN 37
 #endif
 
-
+#define L_TRACE(...) \
+    celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_DEBUG, __VA_ARGS__)
 #define L_DEBUG(...) \
     celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_DEBUG, __VA_ARGS__)
 #define L_INFO(...) \
@@ -72,6 +74,7 @@ struct pubsub_websocket_topic_receiver {
     char *uri;
 
     pubsub_serializer_handler_t* serializerHandler;
+    pubsub_interceptors_handler_t *interceptorsHandler;
 
     celix_websocket_service_t sockSvc;
     long svcId;
@@ -93,7 +96,7 @@ struct pubsub_websocket_topic_receiver {
     long subscriberTrackerId;
     struct {
         celix_thread_mutex_t mutex;
-        hash_map_t *map; //key = bnd id, value = psa_websocket_subscriber_entry_t
+        hash_map_t *map; //key = long svc id, value = psa_websocket_subscriber_entry_t
         bool allInitialized;
     } subscribers;
 };
@@ -111,13 +114,13 @@ typedef struct psa_websocket_requested_connection_entry {
 } psa_websocket_requested_connection_entry_t;
 
 typedef struct psa_websocket_subscriber_entry {
-    hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
+    pubsub_subscriber_t* subscriberSvc;
     bool initialized; //true if the init function is called through the receive thread
 } psa_websocket_subscriber_entry_t;
 
 
-static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
-static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
+static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props);
+static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props);
 static void* psa_websocket_recvThread(void * data);
 static void psa_websocket_connectToAllRequestedConnections(pubsub_websocket_topic_receiver_t *receiver);
 static void psa_websocket_initializeAllSubscribers(pubsub_websocket_topic_receiver_t *receiver);
@@ -138,6 +141,8 @@ pubsub_websocket_topic_receiver_t* pubsub_websocketTopicReceiver_create(celix_bu
     receiver->ctx = ctx;
     receiver->logHelper = logHelper;
     receiver->serializerHandler = serializerHandler;
+    receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_WEBSOCKET_ADMIN_TYPE,
+                                                                     pubsub_serializerHandler_getSerializationType(serializerHandler));
     receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
     receiver->topic = strndup(topic, 1024 * 1024);
     receiver->admin = admin;
@@ -166,8 +171,8 @@ pubsub_websocket_topic_receiver_t* pubsub_websocketTopicReceiver_create(celix_bu
         opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
         opts.filter.filter = buf;
         opts.callbackHandle = receiver;
-        opts.addWithOwner = pubsub_websocketTopicReceiver_addSubscriber;
-        opts.removeWithOwner = pubsub_websocketTopicReceiver_removeSubscriber;
+        opts.addWithProperties = pubsub_websocketTopicReceiver_addSubscriber;
+        opts.removeWithProperties = pubsub_websocketTopicReceiver_removeSubscriber;
 
         receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
     }
@@ -262,22 +267,13 @@ void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *re
         celix_bundleContext_unregisterService(receiver->ctx, receiver->svcId);
 
         celixThreadMutex_lock(&receiver->subscribers.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
-            if (entry != NULL)  {
-                hashMap_destroy(entry->subscriberServices, false, false);
-                free(entry);
-            }
-
-        }
-        hashMap_destroy(receiver->subscribers.map, false, false);
+        hashMap_destroy(receiver->subscribers.map, false, true);
 
 
         celixThreadMutex_unlock(&receiver->subscribers.mutex);
 
         celixThreadMutex_lock(&receiver->requestedConnections.mutex);
-        iter = hashMapIterator_construct(receiver->requestedConnections.map);
+        hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
         while (hashMapIterator_hasNext(&iter)) {
             psa_websocket_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (entry != NULL) {
@@ -307,6 +303,8 @@ void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *re
         }
         celix_arrayList_destroy(receiver->recvBuffer.list);
 
+        pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler);
+
         free(receiver->uri);
         free(receiver->scope);
         free(receiver->topic);
@@ -394,10 +392,9 @@ void pubsub_websocketTopicReceiver_disconnectFrom(pubsub_websocket_topic_receive
     free(key);
 }
 
-static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
+static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props) {
     pubsub_websocket_topic_receiver_t *receiver = handle;
 
-    long bndId = celix_bundle_getId(bnd);
     long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
     if (receiver->scope == NULL){
@@ -414,96 +411,113 @@ static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc,
         return;
     }
 
+    psa_websocket_subscriber_entry_t* entry = calloc(1, sizeof(*entry));
+    entry->subscriberSvc = svc;
+    entry->initialized = false;
+
     celixThreadMutex_lock(&receiver->subscribers.mutex);
-    psa_websocket_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
-    if (entry == NULL) {
-        //new create entry
-        entry = calloc(1, sizeof(*entry));
-        entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
-        entry->initialized = false;
-        hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
-    }
-    hashMap_put(entry->subscriberServices, (void*)svcId, svc);
+    hashMap_put(receiver->subscribers.map, (void*)svcId, entry);
+    receiver->subscribers.allInitialized = false;
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
 
-static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props __attribute__((unused)), const celix_bundle_t *bnd) {
+static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props) {
     pubsub_websocket_topic_receiver_t *receiver = handle;
 
-    long bndId = celix_bundle_getId(bnd);
     long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
 
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    psa_websocket_subscriber_entry_t *entry = hashMap_remove(receiver->subscribers.map, (void*)svcId);
+    free(entry);
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
 
+static void callReceivers(
+        pubsub_websocket_topic_receiver_t *receiver,
+        uint32_t msgId,
+        const pubsub_websocket_msg_header_t* header,
+        const char *payload,
+        size_t payloadSize,
+        void** msg,
+        bool* release,
+        const celix_properties_t* metadata) {
+    *release = true;
     celixThreadMutex_lock(&receiver->subscribers.mutex);
-    psa_websocket_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
-    if (entry != NULL) {
-        hashMap_remove(entry->subscriberServices, (void*)svcId);
-    }
-    if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
-        //remove entry
-        hashMap_remove(receiver->subscribers.map, (void*)bndId);
-        hashMap_destroy(entry->subscriberServices, false, false);
-        free(entry);
+    hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_websocket_subscriber_entry_t* entry = hashMapIterator_nextValue(&iter);
+        if (entry != NULL && entry->subscriberSvc->receive != NULL) {
+            entry->subscriberSvc->receive(entry->subscriberSvc->handle, header->fqn, msgId, *msg, metadata, release);
+            if (!(*release)) {
+                //receive function has taken ownership, deserialize again for new message
+                struct iovec deSerializeBuffer;
+                deSerializeBuffer.iov_base = (void*) payload;
+                deSerializeBuffer.iov_len = payloadSize;
+                celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler,
+                                                                             msgId,
+                                                                             header->major,
+                                                                             header->minor,
+                                                                             &deSerializeBuffer, 0, msg);
+                if (status != CELIX_SUCCESS) {
+                    L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", header->fqn,
+                           receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+                    break;
+                }
+            }
+            *release = true;
+        }
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
 
-static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_t *receiver, psa_websocket_subscriber_entry_t* entry, pubsub_websocket_msg_header_t *hdr, const char* payload, size_t payloadSize) {
-    //NOTE receiver->subscribers.mutex locked
-
-    uint32_t msgId = pubsub_serializerHandler_getMsgId(receiver->serializerHandler, hdr->id);
-
+static void processJsonMsg(pubsub_websocket_topic_receiver_t *receiver, const pubsub_websocket_msg_header_t* header, const char *payload, size_t payloadSize) {
+    uint32_t msgId = pubsub_serializerHandler_getMsgId(receiver->serializerHandler, header->fqn);
     if (msgId == 0) {
-        L_WARN("Cannot find msg id for msg fqn %s", hdr->id);
+        L_WARN("Cannot find msg id for msg fqn %s", header->fqn);
         return;
     }
 
-    void *deSerializedMsg = NULL;
-    bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, msgId, hdr->major, hdr->minor);
+    void *deserializedMsg = NULL;
+    bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, msgId, header->major, header->minor);
     if (validVersion) {
         struct iovec deSerializeBuffer;
-        deSerializeBuffer.iov_base = (void *)payload;
-        deSerializeBuffer.iov_len  = payloadSize;
-        celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, msgId, hdr->major, hdr->minor, &deSerializeBuffer, 0, &deSerializedMsg);
+        deSerializeBuffer.iov_base = (void*)payload;
+        deSerializeBuffer.iov_len = payloadSize;
+        celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, msgId,
+                                                                     header->major,
+                                                                     header->minor,
+                                                                     &deSerializeBuffer, 0, &deserializedMsg);
         if (status == CELIX_SUCCESS) {
-            hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
-            bool release = true;
-            while (hashMapIterator_hasNext(&iter)) {
-                pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
-                svc->receive(svc->handle, hdr->id, msgId, deSerializedMsg, NULL, &release);
-                if (!release && hashMapIterator_hasNext(&iter)) {
-                    //receive function has taken ownership and still more receive function to come ..
-                    //deserialize again for new message
-                    status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, msgId, hdr->major, hdr->minor, &deSerializeBuffer, 0, &deSerializedMsg);
-                    if (status != CELIX_SUCCESS) {
-                        L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", hdr->id, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
-                        break;
-                    }
-                    release = true;
+            celix_properties_t *metadata = NULL; //NOTE metadata not supported for websocket
+            bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, header->fqn, msgId,
+                                                                  deserializedMsg, &metadata);
+            if (cont) {
+                bool release;
+                callReceivers(receiver, msgId, header, payload, payloadSize, &deserializedMsg, &release, metadata);
+                pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, header->fqn, msgId, deserializedMsg, metadata);
+                if (release) {
+                    pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, msgId, deserializedMsg);
                 }
             }
-            if (release) {
-                pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, msgId, deSerializedMsg);
-            }
         } else {
-            L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", hdr->id, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+            L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", header->fqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
         }
     } else {
-        L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version send: %i.%i.x",
-               hdr->id,
+        L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version local: %i.%i.x",
+               header->fqn,
                pubsub_serializerHandler_getSerializationType(receiver->serializerHandler),
-               (int)hdr->major,
-               (int)hdr->minor,
+               (int) header->major,
+               (int) header->minor,
                pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, msgId),
                pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, msgId));
     }
 }
 
-static inline void processMsg(pubsub_websocket_topic_receiver_t *receiver, const char *msg, size_t msgSize) {
+static void processMsg(pubsub_websocket_topic_receiver_t *receiver, const char *msg, size_t msgSize) {
     json_error_t error;
     json_t *jsMsg = json_loadb(msg, msgSize, 0, &error);
-    if(jsMsg != NULL) {
-        json_t *jsId = json_object_get(jsMsg, "id");
+    if (jsMsg != NULL) {
+        json_t *jsId = json_object_get(jsMsg, "id"); //NOTE called id, but is the msgFqn
         json_t *jsMajor = json_object_get(jsMsg, "major");
         json_t *jsMinor = json_object_get(jsMsg, "minor");
         json_t *jsSeqNr = json_object_get(jsMsg, "seqNr");
@@ -511,24 +525,15 @@ static inline void processMsg(pubsub_websocket_topic_receiver_t *receiver, const
 
         if (jsId && jsMajor && jsMinor && jsSeqNr && jsData) {
             pubsub_websocket_msg_header_t hdr;
-            hdr.id = json_string_value(jsId);
+            hdr.fqn = json_string_value(jsId);
             hdr.major = (uint8_t) json_integer_value(jsMajor);
             hdr.minor = (uint8_t) json_integer_value(jsMinor);
             hdr.seqNr = (uint32_t) json_integer_value(jsSeqNr);
-            const char *payload = json_dumps(jsData, 0);
+            char *payload = json_dumps(jsData, 0);
             size_t payloadSize = strlen(payload);
-            printf("Received msg: id %s\tmajor %u\tminor %u\tseqNr %u\tdata %s\n", hdr.id, hdr.major, hdr.minor, hdr.seqNr, payload);
-
-            celixThreadMutex_lock(&receiver->subscribers.mutex);
-            hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
-            while (hashMapIterator_hasNext(&iter)) {
-                psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
-                if (entry != NULL) {
-                    processMsgForSubscriberEntry(receiver, entry, &hdr, payload, payloadSize);
-                }
-            }
-            celixThreadMutex_unlock(&receiver->subscribers.mutex);
-            free((void *) payload);
+            L_TRACE("Received msg: fqn %s\tmajor %u\tminor %u\tseqNr %u\tdata %s\n", hdr.fqn, hdr.major, hdr.minor, hdr.seqNr, payload);
+            processJsonMsg(receiver, &hdr, payload, payloadSize);
+            free(payload);
         } else {
             L_WARN("[PSA_WEBSOCKET_TR] Received unsupported message: "
                    "ID = %s, major = %d, minor = %d, seqNr = %d, data valid? %s",
@@ -539,9 +544,7 @@ static inline void processMsg(pubsub_websocket_topic_receiver_t *receiver, const
         json_decref(jsMsg);
     } else {
         L_WARN("[PSA_WEBSOCKET_TR] Failed to load websocket JSON message, error line: %d, error message: %s", error.line, error.text);
-        return;
     }
-
 }
 
 static void* psa_websocket_recvThread(void * data) {
@@ -728,20 +731,16 @@ static void psa_websocket_initializeAllSubscribers(pubsub_websocket_topic_receiv
         while (hashMapIterator_hasNext(&iter)) {
             psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (!entry->initialized) {
-                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
-                while (hashMapIterator_hasNext(&iter2)) {
-                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
-                    int rc = 0;
-                    if (svc != NULL && svc->init != NULL) {
-                        rc = svc->init(svc->handle);
-                    }
-                    if (rc == 0) {
-                        //note now only initialized on first subscriber entries added.
-                        entry->initialized = true;
-                    } else {
-                        L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
-                        allInitialized = false;
-                    }
+                int rc = 0;
+                if (entry->subscriberSvc != NULL && entry->subscriberSvc->init != NULL) {
+                    rc = entry->subscriberSvc->init(entry->subscriberSvc->handle);
+                }
+                if (rc == 0) {
+                    //note now only initialized on first subscriber entries added.
+                    entry->initialized = true;
+                } else {
+                    L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+                    allInitialized = false;
                 }
             }
         }
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
index adc5ffe..e435093 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
@@ -33,6 +33,7 @@
 #include "http_admin/api.h"
 #include "civetweb.h"
 #include "pubsub_websocket_admin.h"
+#include "pubsub_interceptors_handler.h"
 
 #define FIRST_SEND_DELAY_IN_SECONDS             2
 
@@ -56,6 +57,7 @@ struct pubsub_websocket_topic_sender {
     char *uri;
 
     pubsub_serializer_handler_t* serializerHandler;
+    pubsub_interceptors_handler_t *interceptorsHandler;
 
     int seqNr; //atomic
 
@@ -102,6 +104,7 @@ pubsub_websocket_topic_sender_t* pubsub_websocketTopicSender_create(
     sender->ctx = ctx;
     sender->logHelper = logHelper;
     sender->serializerHandler = serializerHandler;
+    sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_WEBSOCKET_ADMIN_TYPE, pubsub_serializerHandler_getSerializationType(serializerHandler));
 
     psa_websocket_setScopeAndTopicFilter(scope, topic, sender->scopeAndTopicFilter);
     sender->uri = psa_websocket_createURI(scope, topic);
@@ -177,6 +180,7 @@ void pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender
         if (sender->scope != NULL) {
             free(sender->scope);
         }
+        pubsubInterceptorsHandler_destroy(sender->interceptorsHandler);
         free(sender->topic);
         free(sender->uri);
         free(sender);
@@ -263,6 +267,11 @@ static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgType
         return status;
     }
 
+    bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, &metadata);
+    if (!cont) {
+        L_DEBUG("Cancel send based on pubsub interceptor cancel return");
+        return status;
+    }
 
     if (sender->sockConnection != NULL) {
         delay_first_send_for_late_joiners(sender);
@@ -306,6 +315,8 @@ static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgType
     	status = CELIX_SUCCESS; // Not an error, just nothing to do
     }
 
+    pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, metadata);
+
     return status;
 }
 
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
index d6c5805..f5e70b0 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
@@ -64,13 +64,13 @@
 struct pubsub_zmq_topic_receiver {
     celix_bundle_context_t *ctx;
     celix_log_helper_t *logHelper;
-    pubsub_serializer_handler_t* serializerHandler;
     void *admin;
     long protocolSvcId;
     pubsub_protocol_service_t *protocol;
     char *scope;
     char *topic;
 
+    pubsub_serializer_handler_t* serializerHandler;
     pubsub_interceptors_handler_t *interceptorsHandler;
 
     void *zmqCtx;
@@ -466,17 +466,14 @@ static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver, pubsub_prot
                                                                      message->header.msgMinorVersion,
                                                                      &deSerializeBuffer, 0, &deserializedMsg);
         if (status == CELIX_SUCCESS) {
-            uint32_t msgId = message->header.msgId;
             celix_properties_t *metadata = message->metadata.metadata;
-            bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId,
-                                                                  deserializedMsg, &metadata);
+            bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, message->header.msgId, deserializedMsg, &metadata);
             if (cont) {
                 bool release;
                 callReceivers(receiver, msgFqn, message, &deserializedMsg, &release, metadata);
-                pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, metadata);
+                pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, message->header.msgId, deserializedMsg, metadata);
                 if (release) {
-                    pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId,
-                                                                 deserializedMsg);
+                    pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deserializedMsg);
                 }
             }
         } else {
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
index d1f36a5..155cb19 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
@@ -52,13 +52,13 @@
 struct pubsub_zmq_topic_sender {
     celix_bundle_context_t *ctx;
     celix_log_helper_t *logHelper;
-    pubsub_serializer_handler_t* serializerHandler;
     void *admin;
     long protocolSvcId;
     pubsub_protocol_service_t *protocol;
     uuid_t fwUUID;
     bool zeroCopyEnabled;
 
+    pubsub_serializer_handler_t* serializerHandler;
     pubsub_interceptors_handler_t *interceptorsHandler;
 
     char *scope;

[celix] 03/05: Removes interceptor support from psa zmq/tcp v1 (the behavior is different from v2)

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 3f0014b778b9abb3a2df04e9eca50eee5eca0fb0
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Mon Jul 26 19:04:19 2021 +0200

    Removes interceptor support from psa zmq/tcp v1 (the behavior is different from v2)
---
 .../v1/src/pubsub_tcp_topic_receiver.c             |  54 ++---
 .../v1/src/pubsub_tcp_topic_sender.c               |  87 ++++----
 .../v1/src/pubsub_zmq_topic_receiver.c             |  45 ++--
 .../v1/src/pubsub_zmq_topic_sender.c               | 232 ++++++++++-----------
 4 files changed, 182 insertions(+), 236 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c
index 4178b50..aeda1c3 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c
@@ -34,7 +34,6 @@
 #include <uuid/uuid.h>
 #include <pubsub_admin_metrics.h>
 #include <pubsub_utils.h>
-#include "pubsub_interceptors_handler.h"
 #include <celix_api.h>
 
 #ifndef UUID_STR_LEN
@@ -64,7 +63,6 @@ struct pubsub_tcp_topic_receiver {
     bool isPassive;
     pubsub_tcpHandler_t *socketHandler;
     pubsub_tcpHandler_t *sharedSocketHandler;
-    pubsub_interceptors_handler_t *interceptorsHandler;
 
     struct {
         celix_thread_t thread;
@@ -144,7 +142,6 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
     receiver->protocol = protocol;
     receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
     receiver->topic = strndup(topic, 1024 * 1024);
-    receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_TCP_ADMIN_TYPE, "*unknown*");
     const char *staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_CONNECT_URLS_FOR, topic, scope);
     const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope);
     const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
@@ -322,7 +319,6 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
             pubsub_tcpHandler_destroy(receiver->socketHandler);
             receiver->socketHandler = NULL;
         }
-        pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler);
         if (receiver->scope != NULL) {
             free(receiver->scope);
         }
@@ -549,39 +545,31 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
             }
 
             if (status == CELIX_SUCCESS) {
-                const char *msgType = msgSer->msgName;
-                uint32_t msgId = message->header.msgId;
-                celix_properties_t *metadata = message->metadata.metadata;
-                bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, &metadata);
                 bool release = true;
-                if (cont) {
-                    hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
-                    while (hashMapIterator_hasNext(&iter)) {
-                        pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
-                        svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata, &release);
-                        pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata);
-                        if (!release) {
-                            //receive function has taken ownership, deserialize again for new message
-                            status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 1, &deSerializedMsg);
-                            if (status != CELIX_SUCCESS) {
-                                L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s",
-                                       msgSer->msgName,
-                                       receiver->scope == NULL ? "(null)" : receiver->scope,
-                                       receiver->topic);
-                                break;
-                            }
-                            release = true;
+                hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+                while (hashMapIterator_hasNext(&iter)) {
+                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
+                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata, &release);
+                    if (!release) {
+                        //receive function has taken ownership, deserialize again for new message
+                        status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 1, &deSerializedMsg);
+                        if (status != CELIX_SUCCESS) {
+                            L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s",
+                                   msgSer->msgName,
+                                   receiver->scope == NULL ? "(null)" : receiver->scope,
+                                   receiver->topic);
+                            break;
                         }
+                        release = true;
                     }
-                    pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata);
-                    if (release) {
-                        msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
-                    }
-                    if (message->metadata.metadata) {
-                        celix_properties_destroy(message->metadata.metadata);
-                    }
-                    updateReceiveCount += 1;
                 }
+                if (release) {
+                    msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
+                }
+                if (message->metadata.metadata) {
+                    celix_properties_destroy(message->metadata.metadata);
+                }
+                updateReceiveCount += 1;
             } else {
                 updateSerError += 1;
                 L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName,
diff --git a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c
index 32a3328..7bea628 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c
@@ -34,7 +34,6 @@
 #include <uuid/uuid.h>
 #include "celix_constants.h"
 #include <pubsub_utils.h>
-#include "pubsub_interceptors_handler.h"
 
 #define TCP_BIND_MAX_RETRY                      10
 
@@ -58,7 +57,6 @@ struct pubsub_tcp_topic_sender {
     bool metricsEnabled;
     pubsub_tcpHandler_t *socketHandler;
     pubsub_tcpHandler_t *sharedSocketHandler;
-    pubsub_interceptors_handler_t *interceptorsHandler;
 
     char *scope;
     char *topic;
@@ -145,7 +143,6 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     if (uuid != NULL) {
         uuid_parse(uuid, sender->fwUUID);
     }
-    sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_TCP_ADMIN_TYPE, "*unknown*");
     sender->isPassive = false;
     sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED);
     char *urls = NULL;
@@ -303,7 +300,6 @@ void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) {
         celixThreadMutex_unlock(&sender->boundedServices.mutex);
         celixThreadMutex_destroy(&sender->boundedServices.mutex);
 
-        pubsubInterceptorsHandler_destroy(sender->interceptorsHandler);
         if ((sender->socketHandler) && (sender->sharedSocketHandler == NULL)) {
             pubsub_tcpHandler_destroy(sender->socketHandler);
             sender->socketHandler = NULL;
@@ -531,58 +527,47 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
             clock_gettime(CLOCK_REALTIME, &serializationEnd);
         }
 
-        bool cont = false;
-        if (status == CELIX_SUCCESS) /*ser ok*/ {
-            cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, &metadata);
+        pubsub_protocol_message_t message;
+        message.metadata.metadata = NULL;
+        message.payload.payload = NULL;
+        message.payload.length = 0;
+        if (serializedIoVecOutput) {
+            message.payload.payload = serializedIoVecOutput->iov_base;
+            message.payload.length = serializedIoVecOutput->iov_len;
         }
-        if (cont) {
-            pubsub_protocol_message_t message;
-            message.metadata.metadata = NULL;
-            message.payload.payload = NULL;
-            message.payload.length = 0;
-            if (serializedIoVecOutput) {
-                message.payload.payload = serializedIoVecOutput->iov_base;
-                message.payload.length = serializedIoVecOutput->iov_len;
+        message.header.msgId = msgTypeId;
+        message.header.seqNr = entry->seqNr;
+        message.header.msgMajorVersion = entry->major;
+        message.header.msgMinorVersion = entry->minor;
+        message.header.payloadSize = 0;
+        message.header.payloadPartSize = 0;
+        message.header.payloadOffset = 0;
+        message.header.metadataSize = 0;
+        if (metadata != NULL)
+            message.metadata.metadata = metadata;
+        entry->seqNr++;
+        bool sendOk = true;
+        {
+            int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput, serializedIoVecOutputLen, 0);
+            if (rc < 0) {
+                status = -1;
+                sendOk = false;
             }
-            message.header.msgId = msgTypeId;
-            message.header.seqNr = entry->seqNr;
-            message.header.msgMajorVersion = entry->major;
-            message.header.msgMinorVersion = entry->minor;
-            message.header.payloadSize = 0;
-            message.header.payloadPartSize = 0;
-            message.header.payloadOffset = 0;
-            message.header.metadataSize = 0;
-            if (metadata != NULL)
-                message.metadata.metadata = metadata;
-            entry->seqNr++;
-            bool sendOk = true;
-            {
-                int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput, serializedIoVecOutputLen, 0);
-                if (rc < 0) {
-                    status = -1;
-                    sendOk = false;
-                }
-                pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, metadata);
-                if (message.metadata.metadata)
-                    celix_properties_destroy(message.metadata.metadata);
-                if (serializedIoVecOutput) {
-                    entry->msgSer->freeSerializeMsg(entry->msgSer->handle,
-                                                    serializedIoVecOutput,
-                                                    serializedIoVecOutputLen);
-                    serializedIoVecOutput = NULL;
-                }
+            if (message.metadata.metadata)
+                celix_properties_destroy(message.metadata.metadata);
+            if (serializedIoVecOutput) {
+                entry->msgSer->freeSerializeMsg(entry->msgSer->handle,
+                                                serializedIoVecOutput,
+                                                serializedIoVecOutputLen);
+                serializedIoVecOutput = NULL;
             }
+        }
 
-            if (sendOk) {
-                sendCountUpdate = 1;
-            } else {
-                sendErrorUpdate = 1;
-                L_WARN("[PSA_TCP_TS] Error sending msg. %s", strerror(errno));
-            }
+        if (sendOk) {
+            sendCountUpdate = 1;
         } else {
-            serializationErrorUpdate = 1;
-            L_WARN("[PSA_TCP_TS] Error serialize message of type %s for scope/topic %s/%s", entry->msgSer->msgName,
-                   sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
+            sendErrorUpdate = 1;
+            L_WARN("[PSA_TCP_TS] Error sending msg. %s", strerror(errno));
         }
     } else {
         //unknownMessageCountUpdate = 1;
diff --git a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c
index 62b2fbf..28146af 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c
@@ -39,8 +39,6 @@
 #include <pubsub_utils.h>
 #include <celix_api.h>
 
-#include "pubsub_interceptors_handler.h"
-
 #include "celix_utils_api.h"
 
 #define PSA_ZMQ_RECV_TIMEOUT 1000
@@ -70,8 +68,6 @@ struct pubsub_zmq_topic_receiver {
     char *topic;
     bool metricsEnabled;
 
-    pubsub_interceptors_handler_t *interceptorsHandler;
-
     void *zmqCtx;
     void *zmqSock;
 
@@ -157,8 +153,6 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
     receiver->topic = strndup(topic, 1024 * 1024);
     receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
 
-    receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_ZMQ_ADMIN_TYPE, "*unknown*");
-
 #ifdef BUILD_WITH_ZMQ_SECURITY
     char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
     if (keys_bundle_dir == NULL) {
@@ -334,8 +328,6 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) {
         zmq_close(receiver->zmqSock);
         zmq_ctx_term(receiver->zmqCtx);
 
-        pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler);
-
         free(receiver->scope);
         free(receiver->topic);
     }
@@ -522,33 +514,26 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
                 clock_gettime(CLOCK_REALTIME, &endSer);
             }
             if (status == CELIX_SUCCESS) {
-
-                const char *msgType = msgSer->msgName;
-                uint32_t msgId = message->header.msgId;
                 celix_properties_t *metadata = message->metadata.metadata;
-                bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, &metadata);
                 bool release = true;
-                if (cont) {
-                    hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
-                    while (hashMapIterator_hasNext(&iter2)) {
-                        pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
-                        svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, metadata, &release);
-                        if (!release) {
-                            //receive function has taken ownership deserialize again for new message
-                            status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &deserializedMsg);
-                            if (status != CELIX_SUCCESS) {
-                                L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
-                                break;
-                            }
-                            release = true;
+                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+                while (hashMapIterator_hasNext(&iter2)) {
+                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, metadata, &release);
+                    if (!release) {
+                        //receive function has taken ownership deserialize again for new message
+                        status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &deserializedMsg);
+                        if (status != CELIX_SUCCESS) {
+                            L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+                            break;
                         }
-                        pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata);
+                        release = true;
                     }
-                    if (release) {
-                        msgSer->freeDeserializeMsg(msgSer->handle, deserializedMsg);
-                    }
-                    updateReceiveCount += 1;
                 }
+                if (release) {
+                    msgSer->freeDeserializeMsg(msgSer->handle, deserializedMsg);
+                }
+                updateReceiveCount += 1;
             } else {
                 updateSerError += 1;
                 L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c
index aba8893..19c4660 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c
@@ -32,7 +32,6 @@
 #include "pubsub_psa_zmq_constants.h"
 #include <uuid/uuid.h>
 #include "celix_constants.h"
-#include "pubsub_interceptors_handler.h"
 
 #define FIRST_SEND_DELAY_IN_SECONDS             2
 #define ZMQ_BIND_MAX_RETRY                      10
@@ -57,8 +56,6 @@ struct pubsub_zmq_topic_sender {
     bool metricsEnabled;
     bool zeroCopyEnabled;
 
-    pubsub_interceptors_handler_t *interceptorsHandler;
-
     char *scope;
     char *topic;
     char *url;
@@ -157,8 +154,6 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
     sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
     sender->zeroCopyEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_ZEROCOPY_ENABLED, PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED);
 
-    sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_ZMQ_ADMIN_TYPE, "*unknown*");
-
     //setting up zmq socket for ZMQ TopicSender
     {
 #ifdef BUILD_WITH_ZMQ_SECURITY
@@ -347,8 +342,6 @@ void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
 
         celixThreadMutex_destroy(&sender->boundedServices.mutex);
 
-        pubsubInterceptorsHandler_destroy(sender->interceptorsHandler);
-
         if (sender->scope != NULL) {
             free(sender->scope);
         }
@@ -573,138 +566,133 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
                 usleep(500);
             }
 
-            bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, &metadata);
-            if (cont) {
+            pubsub_protocol_message_t message;
+            message.payload.payload = serializedOutput->iov_base;
+            message.payload.length = serializedOutput->iov_len;
 
-                pubsub_protocol_message_t message;
-                message.payload.payload = serializedOutput->iov_base;
-                message.payload.length = serializedOutput->iov_len;
+            void *payloadData = NULL;
+            size_t payloadLength = 0;
+            entry->protSer->encodePayload(entry->protSer->handle, &message, &payloadData, &payloadLength);
 
-                void *payloadData = NULL;
-                size_t payloadLength = 0;
-                entry->protSer->encodePayload(entry->protSer->handle, &message, &payloadData, &payloadLength);
+            if (metadata != NULL) {
+                message.metadata.metadata = metadata;
+                entry->protSer->encodeMetadata(entry->protSer->handle, &message, &entry->metadataBuffer, &entry->metadataBufferSize);
+            } else {
+                message.metadata.metadata = NULL;
+            }
 
-                if (metadata != NULL) {
-                    message.metadata.metadata = metadata;
-                    entry->protSer->encodeMetadata(entry->protSer->handle, &message, &entry->metadataBuffer, &entry->metadataBufferSize);
-                } else {
-                    message.metadata.metadata = NULL;
+            entry->protSer->encodeFooter(entry->protSer->handle, &message, &entry->footerBuffer, &entry->footerBufferSize);
+
+            message.header.msgId = msgTypeId;
+            message.header.seqNr = entry->seqNr;
+            message.header.msgMajorVersion = 0;
+            message.header.msgMinorVersion = 0;
+            message.header.payloadSize = payloadLength;
+            message.header.metadataSize = entry->metadataBufferSize;
+            message.header.payloadPartSize = payloadLength;
+            message.header.payloadOffset = 0;
+            message.header.isLastSegment = 1;
+            message.header.convertEndianess = 0;
+
+            // increase seqNr
+            entry->seqNr++;
+
+            entry->protSer->encodeHeader(entry->protSer->handle, &message, &entry->headerBuffer, &entry->headerBufferSize);
+
+            errno = 0;
+            bool sendOk;
+
+            if (bound->parent->zeroCopyEnabled) {
+
+                zmq_msg_t msg1; // Header
+                zmq_msg_t msg2; // Payload
+                zmq_msg_t msg3; // Metadata
+                zmq_msg_t msg4; // Footer
+                void *socket = zsock_resolve(sender->zmq.socket);
+                psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry));
+                freeMsgEntry->msgSer = entry->msgSer;
+                freeMsgEntry->serializedOutput = serializedOutput;
+                freeMsgEntry->serializedOutputLen = serializedOutputLen;
+
+                zmq_msg_init_data(&msg1, entry->headerBuffer, entry->headerBufferSize, psa_zmq_unlockData, entry);
+                //send header
+                int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
+                if (rc == -1) {
+                    L_WARN("Error sending header msg. %s", strerror(errno));
+                    zmq_msg_close(&msg1);
                 }
 
-                entry->protSer->encodeFooter(entry->protSer->handle, &message, &entry->footerBuffer, &entry->footerBufferSize);
-
-                message.header.msgId = msgTypeId;
-                message.header.seqNr = entry->seqNr;
-                message.header.msgMajorVersion = 0;
-                message.header.msgMinorVersion = 0;
-                message.header.payloadSize = payloadLength;
-                message.header.metadataSize = entry->metadataBufferSize;
-                message.header.payloadPartSize = payloadLength;
-                message.header.payloadOffset = 0;
-                message.header.isLastSegment = 1;
-                message.header.convertEndianess = 0;
-
-                // increase seqNr
-                entry->seqNr++;
-
-                entry->protSer->encodeHeader(entry->protSer->handle, &message, &entry->headerBuffer, &entry->headerBufferSize);
-
-                errno = 0;
-                bool sendOk;
-
-                if (bound->parent->zeroCopyEnabled) {
-
-                    zmq_msg_t msg1; // Header
-                    zmq_msg_t msg2; // Payload
-                    zmq_msg_t msg3; // Metadata
-                    zmq_msg_t msg4; // Footer
-                    void *socket = zsock_resolve(sender->zmq.socket);
-                    psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry));
-                    freeMsgEntry->msgSer = entry->msgSer;
-                    freeMsgEntry->serializedOutput = serializedOutput;
-                    freeMsgEntry->serializedOutputLen = serializedOutputLen;
-
-                    zmq_msg_init_data(&msg1, entry->headerBuffer, entry->headerBufferSize, psa_zmq_unlockData, entry);
-                    //send header
-                    int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
+                //send Payload
+                if (rc > 0) {
+                    int flag = ((entry->metadataBufferSize > 0)  || (entry->footerBufferSize > 0)) ? ZMQ_SNDMORE : 0;
+                    zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, freeMsgEntry);
+                    rc = zmq_msg_send(&msg2, socket, flag);
                     if (rc == -1) {
-                        L_WARN("Error sending header msg. %s", strerror(errno));
-                        zmq_msg_close(&msg1);
-                    }
-
-                    //send Payload
-                    if (rc > 0) {
-                        int flag = ((entry->metadataBufferSize > 0)  || (entry->footerBufferSize > 0)) ? ZMQ_SNDMORE : 0;
-                        zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, freeMsgEntry);
-                        rc = zmq_msg_send(&msg2, socket, flag);
-                        if (rc == -1) {
-                            L_WARN("Error sending payload msg. %s", strerror(errno));
-                            zmq_msg_close(&msg2);
-                        }
-                    }
-
-                    //send MetaData
-                    if (rc > 0 && entry->metadataBufferSize > 0) {
-                        int flag = (entry->footerBufferSize > 0 ) ? ZMQ_SNDMORE : 0;
-                        zmq_msg_init_data(&msg3, entry->metadataBuffer, entry->metadataBufferSize, NULL, NULL);
-                        rc = zmq_msg_send(&msg3, socket, flag);
-                        if (rc == -1) {
-                            L_WARN("Error sending metadata msg. %s", strerror(errno));
-                            zmq_msg_close(&msg3);
-                        }
-                    }
-
-                    //send Footer
-                    if (rc > 0 && entry->footerBufferSize > 0) {
-                        zmq_msg_init_data(&msg4, entry->footerBuffer, entry->footerBufferSize, NULL, NULL);
-                        rc = zmq_msg_send(&msg4, socket, 0);
-                        if (rc == -1) {
-                            L_WARN("Error sending footer msg. %s", strerror(errno));
-                            zmq_msg_close(&msg4);
-                        }
-                    }
-
-                    sendOk = rc > 0;
-                } else {
-                    //no zero copy
-                    zmsg_t *msg = zmsg_new();
-                    zmsg_addmem(msg, entry->headerBuffer, entry->headerBufferSize);
-                    zmsg_addmem(msg, payloadData, payloadLength);
-                    if (entry->metadataBufferSize > 0) {
-                        zmsg_addmem(msg, entry->metadataBuffer, entry->metadataBufferSize);
-                    }
-                    if (entry->footerBufferSize > 0) {
-                        zmsg_addmem(msg, entry->footerBuffer, entry->footerBufferSize);
+                        L_WARN("Error sending payload msg. %s", strerror(errno));
+                        zmq_msg_close(&msg2);
                     }
-                    int rc = zmsg_send(&msg, sender->zmq.socket);
-                    sendOk = rc == 0;
+                }
 
-                    if (!sendOk) {
-                        zmsg_destroy(&msg); //if send was not ok, no owner change -> destroy msg
+                //send MetaData
+                if (rc > 0 && entry->metadataBufferSize > 0) {
+                    int flag = (entry->footerBufferSize > 0 ) ? ZMQ_SNDMORE : 0;
+                    zmq_msg_init_data(&msg3, entry->metadataBuffer, entry->metadataBufferSize, NULL, NULL);
+                    rc = zmq_msg_send(&msg3, socket, flag);
+                    if (rc == -1) {
+                        L_WARN("Error sending metadata msg. %s", strerror(errno));
+                        zmq_msg_close(&msg3);
                     }
+                }
 
-                    // Note: serialized Payload is deleted by serializer
-                    if (payloadData && (payloadData != message.payload.payload)) {
-                        free(payloadData);
+                //send Footer
+                if (rc > 0 && entry->footerBufferSize > 0) {
+                    zmq_msg_init_data(&msg4, entry->footerBuffer, entry->footerBufferSize, NULL, NULL);
+                    rc = zmq_msg_send(&msg4, socket, 0);
+                    if (rc == -1) {
+                        L_WARN("Error sending footer msg. %s", strerror(errno));
+                        zmq_msg_close(&msg4);
                     }
-
-                    __atomic_store_n(&entry->dataLocked, false, __ATOMIC_RELEASE);
                 }
-                pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, metadata);
 
-                if (message.metadata.metadata) {
-                    celix_properties_destroy(message.metadata.metadata);
+                sendOk = rc > 0;
+            } else {
+                //no zero copy
+                zmsg_t *msg = zmsg_new();
+                zmsg_addmem(msg, entry->headerBuffer, entry->headerBufferSize);
+                zmsg_addmem(msg, payloadData, payloadLength);
+                if (entry->metadataBufferSize > 0) {
+                    zmsg_addmem(msg, entry->metadataBuffer, entry->metadataBufferSize);
                 }
-                if (!bound->parent->zeroCopyEnabled && serializedOutput) {
-                    entry->msgSer->freeSerializeMsg(entry->msgSer->handle, serializedOutput, serializedOutputLen);
+                if (entry->footerBufferSize > 0) {
+                    zmsg_addmem(msg, entry->footerBuffer, entry->footerBufferSize);
                 }
+                int rc = zmsg_send(&msg, sender->zmq.socket);
+                sendOk = rc == 0;
 
-                if (sendOk) {
-                    sendCountUpdate = 1;
-                } else {
-                    sendErrorUpdate = 1;
-                    L_WARN("[PSA_ZMQ_TS] Error sending zmg. %s", strerror(errno));
+                if (!sendOk) {
+                    zmsg_destroy(&msg); //if send was not ok, no owner change -> destroy msg
+                }
+
+                // Note: serialized Payload is deleted by serializer
+                if (payloadData && (payloadData != message.payload.payload)) {
+                    free(payloadData);
                 }
+
+                __atomic_store_n(&entry->dataLocked, false, __ATOMIC_RELEASE);
+            }
+
+            if (message.metadata.metadata) {
+                celix_properties_destroy(message.metadata.metadata);
+            }
+            if (!bound->parent->zeroCopyEnabled && serializedOutput) {
+                entry->msgSer->freeSerializeMsg(entry->msgSer->handle, serializedOutput, serializedOutputLen);
+            }
+
+            if (sendOk) {
+                sendCountUpdate = 1;
+            } else {
+                sendErrorUpdate = 1;
+                L_WARN("[PSA_ZMQ_TS] Error sending zmg. %s", strerror(errno));
             }
         } else {
             serializationErrorUpdate = 1;

[celix] 04/05: Fixes memleak in pubsub topology manager

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 0b04fde022c3cd9b681837d629fcb66d466be203
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Mon Jul 26 19:58:29 2021 +0200

    Fixes memleak in pubsub topology manager
---
 .../pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index eca3796..fdeb20c 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -932,15 +932,13 @@ static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
                 psa->matchPublisher(psa->handle, entry->bndId, entry->publisherFilter, &topicProps, &score, &serSvcId,
                                     &protSvcId);
                 if (score > highestScore) {
-                    if (topicPropertiesForHighestMatch != NULL) {
-                        celix_properties_destroy(topicPropertiesForHighestMatch);
-                    }
+                    celix_properties_destroy(topicPropertiesForHighestMatch);
                     highestScore = score;
                     serializerSvcId = serSvcId;
                     protocolSvcId = protSvcId;
                     selectedPsaSvcId = svcId;
                     topicPropertiesForHighestMatch = topicProps;
-                } else if (topicProps != NULL) {
+                } else {
                     celix_properties_destroy(topicProps);
                 }
             }
@@ -968,6 +966,7 @@ static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
                                       entry->scope,
                                       entry->topic,
                                       celix_filter_getFilterString(entry->publisherFilter));
+                celix_properties_destroy(topicPropertiesForHighestMatch);
             }
         }
     }
@@ -997,6 +996,8 @@ static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
             celixThreadMutex_unlock(&manager->topicSenders.mutex);
         } else {
             celix_logHelper_warning(manager->loghelper, "Cannot setup TopicSender for %s/%s\n", setupEntry->scope == NULL ? "(null)" : setupEntry->scope, setupEntry->topic);
+            celix_properties_destroy(setupEntry->topicProperties);
+            celix_properties_destroy(setupEntry->endpointResult);
         }
         free(setupEntry->scope);
         free(setupEntry->topic);

[celix] 02/05: Updates cmake configuration and fixes invalid pubsub cmake aliases.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 02a962787f3291e9858bd37c5501ae97e8f14bef
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Mon Jul 26 16:25:31 2021 +0200

    Updates cmake configuration and fixes invalid pubsub cmake aliases.
---
 bundles/cxx_remote_services/CMakeLists.txt         |   3 +-
 .../discovery_configured/CMakeLists.txt            |   1 +
 .../cxx_remote_services/integration/CMakeLists.txt |  20 +-
 .../integration/gtest/CMakeLists.txt               |  16 +-
 bundles/http_admin/http_admin/CMakeLists.txt       |   1 +
 bundles/logging/log_admin/CMakeLists.txt           |   1 +
 bundles/logging/log_service_v2/CMakeLists.txt      |   2 +
 .../log_writers/syslog_writer/CMakeLists.txt       |   1 +
 bundles/pubsub/CMakeLists.txt                      |  12 +-
 bundles/pubsub/examples/CMakeLists.txt             | 156 ++++++++--------
 bundles/pubsub/integration/CMakeLists.txt          | 202 ++++++++++-----------
 bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt  |   4 +-
 bundles/pubsub/pubsub_admin_tcp/v2/CMakeLists.txt  |   2 +-
 bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt  |   2 +-
 .../pubsub_admin_websocket/v1/CMakeLists.txt       |   4 +-
 .../pubsub_admin_websocket/v2/CMakeLists.txt       |   2 +-
 bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt  |   2 +-
 bundles/pubsub/pubsub_admin_zmq/v2/CMakeLists.txt  |   2 +-
 bundles/pubsub/pubsub_discovery/CMakeLists.txt     |   2 +-
 .../pubsub_protocol_wire_v1/CMakeLists.txt         |   2 +-
 .../pubsub_protocol_wire_v2/CMakeLists.txt         |   2 +-
 .../pubsub_serializer_avrobin/CMakeLists.txt       |   2 +-
 .../pubsub/pubsub_serializer_json/CMakeLists.txt   |   2 +-
 .../pubsub/pubsub_topology_manager/CMakeLists.txt  |   2 +-
 bundles/shell/remote_shell/CMakeLists.txt          |   1 +
 bundles/shell/shell/CMakeLists.txt                 |   2 +
 bundles/shell/shell_bonjour/CMakeLists.txt         |   1 +
 bundles/shell/shell_tui/CMakeLists.txt             |   1 +
 bundles/shell/shell_wui/CMakeLists.txt             |   1 +
 cmake/AddGTest.cmake                               |   2 +-
 cmake/CelixConfig.cmake                            |  11 ++
 cmake/cmake_celix/BundlePackaging.cmake            | 109 ++++++-----
 cmake/cmake_celix/Generic.cmake                    |  13 +-
 documents/cmake_commands/README.md                 |  28 +++
 34 files changed, 330 insertions(+), 284 deletions(-)

diff --git a/bundles/cxx_remote_services/CMakeLists.txt b/bundles/cxx_remote_services/CMakeLists.txt
index f09ae80..ac98684 100644
--- a/bundles/cxx_remote_services/CMakeLists.txt
+++ b/bundles/cxx_remote_services/CMakeLists.txt
@@ -17,7 +17,8 @@
 
 celix_subproject(REMOTE_SERVICE_ADMIN "Option to enable building the C++17 Remote Service Admin Service bundles" OFF)
 if (REMOTE_SERVICE_ADMIN)
-    message(WARNING "The C++ Remote Service Admin is still experimental; The API, SPI and implementation is not stable and will change")
+    message(STATUS  "The C++ Remote Service Admin is still experimental; The API, SPI and implementation is not stable and will change")
+
     add_subdirectory(rsa_spi)
     add_subdirectory(admin)
     add_subdirectory(discovery_configured)
diff --git a/bundles/cxx_remote_services/discovery_configured/CMakeLists.txt b/bundles/cxx_remote_services/discovery_configured/CMakeLists.txt
index 39850ef..49f31dd 100644
--- a/bundles/cxx_remote_services/discovery_configured/CMakeLists.txt
+++ b/bundles/cxx_remote_services/discovery_configured/CMakeLists.txt
@@ -28,6 +28,7 @@ add_celix_bundle(RsaConfiguredDiscovery
     VERSION 0.9.0
     SYMBOLIC_NAME "apache::celix::RsaConfiguredDiscovery"
     NAME "Apache Celix Async Configured Discovery"
+    FILENAME celix_RsaConfiguredDiscovery
     GROUP "Celix/RSA"
     SOURCES
         src/ConfiguredDiscoveryManager.cc
diff --git a/bundles/cxx_remote_services/integration/CMakeLists.txt b/bundles/cxx_remote_services/integration/CMakeLists.txt
index 4b578ce..860a081 100644
--- a/bundles/cxx_remote_services/integration/CMakeLists.txt
+++ b/bundles/cxx_remote_services/integration/CMakeLists.txt
@@ -58,11 +58,11 @@ add_celix_container(RemoteCalculatorProvider
             Celix::shell_tui
 
             #Pubsub needed for remote services on pubsub
-            Celix::pubsub_serializer_json
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_zmq_v2
-            Celix::pubsub_protocol_wire_v2
-            Celix::pubsub_discovery_etcd
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_zmq_v2
+            Celix::celix_pubsub_protocol_wire_v2
+            Celix::celix_pubsub_discovery_etcd
 
             #Remote Services
             Celix::RemoteServiceAdmin
@@ -81,11 +81,11 @@ add_celix_container(RemoteCalculatorConsumer
             Celix::shell_tui
 
             #Pubsub needed for remote services on pubsub
-            Celix::pubsub_serializer_json
-            Celix::pubsub_topology_manager
-            Celix::pubsub_discovery_etcd
-            Celix::pubsub_admin_zmq_v2
-            Celix::pubsub_protocol_wire_v2
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_discovery_etcd
+            Celix::celix_pubsub_admin_zmq_v2
+            Celix::celix_pubsub_protocol_wire_v2
 
             #Remote Services
             Celix::RsaConfiguredDiscovery
diff --git a/bundles/cxx_remote_services/integration/gtest/CMakeLists.txt b/bundles/cxx_remote_services/integration/gtest/CMakeLists.txt
index 5c548ec..81386b9 100644
--- a/bundles/cxx_remote_services/integration/gtest/CMakeLists.txt
+++ b/bundles/cxx_remote_services/integration/gtest/CMakeLists.txt
@@ -23,10 +23,10 @@ target_compile_options(test_cxx_remote_services_integration PRIVATE -std=c++17)
 target_include_directories(test_cxx_remote_services_integration PRIVATE ../include) #Add ICalculator
 
 add_celix_bundle_dependencies(test_cxx_remote_services_integration
-    Celix::pubsub_serializer_json
-    Celix::pubsub_topology_manager
-    Celix::pubsub_admin_zmq_v2
-    Celix::pubsub_protocol_wire_v2
+    Celix::celix_pubsub_serializer_json
+    Celix::celix_pubsub_topology_manager
+    Celix::celix_pubsub_admin_zmq_v2
+    Celix::celix_pubsub_protocol_wire_v2
     Celix::RsaConfiguredDiscovery
     Celix::RemoteServiceAdmin
     TestExportImportRemoteServiceFactory
@@ -34,16 +34,16 @@ add_celix_bundle_dependencies(test_cxx_remote_services_integration
     CalculatorConsumer
 )
 
-celix_get_bundle_file(Celix::pubsub_serializer_json PS_SER_BUNDLE_LOC)
+celix_get_bundle_file(Celix::celix_pubsub_serializer_json PS_SER_BUNDLE_LOC)
 target_compile_definitions(test_cxx_remote_services_integration PRIVATE PS_SER_BUNDLE_LOC="${PS_SER_BUNDLE_LOC}")
 
-celix_get_bundle_file(Celix::pubsub_topology_manager PS_PSTM_BUNDLE_LOC)
+celix_get_bundle_file(Celix::celix_pubsub_topology_manager PS_PSTM_BUNDLE_LOC)
 target_compile_definitions(test_cxx_remote_services_integration PRIVATE PS_PSTM_BUNDLE_LOC="${PS_PSTM_BUNDLE_LOC}")
 
-celix_get_bundle_file(Celix::pubsub_admin_zmq_v2 PS_PSA_BUNDLE_LOC)
+celix_get_bundle_file(Celix::celix_pubsub_admin_zmq_v2 PS_PSA_BUNDLE_LOC)
 target_compile_definitions(test_cxx_remote_services_integration PRIVATE PS_PSA_BUNDLE_LOC="${PS_PSA_BUNDLE_LOC}")
 
-celix_get_bundle_file(Celix::pubsub_protocol_wire_v2 PS_WIRE_BUNDLE_LOC)
+celix_get_bundle_file(Celix::celix_pubsub_protocol_wire_v2 PS_WIRE_BUNDLE_LOC)
 target_compile_definitions(test_cxx_remote_services_integration PRIVATE PS_WIRE_BUNDLE_LOC="${PS_WIRE_BUNDLE_LOC}")
 
 celix_get_bundle_file(Celix::RsaConfiguredDiscovery RS_DISCOVERY_BUNDLE_LOC)
diff --git a/bundles/http_admin/http_admin/CMakeLists.txt b/bundles/http_admin/http_admin/CMakeLists.txt
index 6a24137..7153b61 100644
--- a/bundles/http_admin/http_admin/CMakeLists.txt
+++ b/bundles/http_admin/http_admin/CMakeLists.txt
@@ -25,6 +25,7 @@ add_celix_bundle(http_admin
     SYMBOLIC_NAME "apache_celix_http_admin"
     GROUP "Celix/HTTP_admin"
     NAME "Apache Celix HTTP Admin"
+    FILENAME celix_http_admin
 )
 target_include_directories(http_admin PRIVATE src)
 
diff --git a/bundles/logging/log_admin/CMakeLists.txt b/bundles/logging/log_admin/CMakeLists.txt
index 828f78d..3d2bdff 100644
--- a/bundles/logging/log_admin/CMakeLists.txt
+++ b/bundles/logging/log_admin/CMakeLists.txt
@@ -23,6 +23,7 @@ add_celix_bundle(log_admin
 	SOURCES
 		src/celix_log_admin.c
 		src/celix_log_admin_activator.c
+	FILENAME celix_log_admin
 )
 target_link_libraries(log_admin PRIVATE Celix::log_helper Celix::shell_api)
 target_include_directories(log_admin PRIVATE src)
diff --git a/bundles/logging/log_service_v2/CMakeLists.txt b/bundles/logging/log_service_v2/CMakeLists.txt
index 796269e..ecefd3e 100644
--- a/bundles/logging/log_service_v2/CMakeLists.txt
+++ b/bundles/logging/log_service_v2/CMakeLists.txt
@@ -18,6 +18,7 @@
 add_celix_bundle(log_service
 	SYMBOLIC_NAME "apache_celix_log_service"
 	NAME "Apache Celix Log Service"
+	FILENAME celix_log_service
 	GROUP "Celix/Logging"
 	VERSION "1.1.0"
 	SOURCES
@@ -36,6 +37,7 @@ install_celix_bundle(log_service EXPORT celix COMPONENT logging)
 add_celix_bundle(log_writer_stdout
 	SYMBOLIC_NAME "apache_celix_log_writer_stdout"
 	NAME "Apache Celix Log Writer Stdout"
+	FILENAME celix_log_writer_stdout
 	GROUP "Celix/Logging"
 	VERSION "1.1.0"
 	SOURCES
diff --git a/bundles/logging/log_writers/syslog_writer/CMakeLists.txt b/bundles/logging/log_writers/syslog_writer/CMakeLists.txt
index 428b286..e0678dd 100644
--- a/bundles/logging/log_writers/syslog_writer/CMakeLists.txt
+++ b/bundles/logging/log_writers/syslog_writer/CMakeLists.txt
@@ -18,6 +18,7 @@
 add_celix_bundle(syslog_writer
 		SYMBOLIC_NAME "apache_celix_syslog_writer"
 		NAME "Apache Celix Syslog Writer"
+		FILENAME celix_syslog_writer
 		GROUP "Celix/Logging"
 		VERSION "1.0.0"
 		SOURCES
diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index 482ee3a..01d2140 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -21,16 +21,16 @@ if (PUBSUB)
     option(BUILD_PUBSUB_PSA_ZMQ "Build ZeroMQ PubSub Admin (LGPL License)" ON)
     if (BUILD_PUBSUB_PSA_ZMQ)
         option(BUILD_ZMQ_SECURITY "Build with security for ZeroMQ." OFF)
-        add_subdirectory(pubsub_admin_zmq/v1) #TODO option for v1 admins
+        add_subdirectory(pubsub_admin_zmq/v1)
         add_subdirectory(pubsub_admin_zmq/v2)
-        add_library(Celix::pubsub_admin_zmq ALIAS celix_pubsub_admin_zmq_v1) #TODO move to config and set to v2
+        add_library(Celix::celix_pubsub_admin_zmq ALIAS celix_pubsub_admin_zmq_v1)
     endif (BUILD_PUBSUB_PSA_ZMQ)
 
     option(BUILD_PUBSUB_PSA_TCP "Build TCP PubSub Admin" ON)
     if (BUILD_PUBSUB_PSA_TCP)
-        add_subdirectory(pubsub_admin_tcp/v1) #TODO option for v1 admins
+        add_subdirectory(pubsub_admin_tcp/v1)
         add_subdirectory(pubsub_admin_tcp/v2)
-        add_library(Celix::pubsub_admin_tcp ALIAS celix_pubsub_admin_tcp_v1) #TODO move to config and set to v2
+        add_library(Celix::celix_pubsub_admin_tcp ALIAS celix_pubsub_admin_tcp_v1)
     endif (BUILD_PUBSUB_PSA_TCP)
 
     option(BUILD_PUBSUB_PSA_UDP_MC "Build UDP MC PubSub Admin" ON)
@@ -40,9 +40,9 @@ if (PUBSUB)
 
     option(BUILD_PUBSUB_PSA_WS "Build WebSocket PubSub Admin" ON)
     if (BUILD_PUBSUB_PSA_WS)
-        add_subdirectory(pubsub_admin_websocket/v1) #TODO option for v1 admins
+        add_subdirectory(pubsub_admin_websocket/v1)
         add_subdirectory(pubsub_admin_websocket/v2)
-        add_library(Celix::pubsub_admin_websocket ALIAS celix_pubsub_admin_websocket_v1) #TODO move to config and set to v2
+        add_library(Celix::celix_pubsub_admin_websocket ALIAS celix_pubsub_admin_websocket_v1)
     endif (BUILD_PUBSUB_PSA_WS)
 
     add_subdirectory(pubsub_api)
diff --git a/bundles/pubsub/examples/CMakeLists.txt b/bundles/pubsub/examples/CMakeLists.txt
index b81c797..a34a28b 100644
--- a/bundles/pubsub/examples/CMakeLists.txt
+++ b/bundles/pubsub/examples/CMakeLists.txt
@@ -32,10 +32,10 @@ if (BUILD_PUBSUB_PSA_UDP_MC)
             Celix::log_admin
             Celix::shell
             Celix::shell_tui
-            Celix::pubsub_serializer_json
-            Celix::pubsub_discovery_etcd
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_udp_multicast
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_discovery_etcd
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_udp_multicast
             celix_pubsub_poi_publisher
             celix_pubsub_poi_publisher2
         PROPERTIES
@@ -51,10 +51,10 @@ if (BUILD_PUBSUB_PSA_UDP_MC)
             Celix::log_admin
             Celix::shell
             Celix::shell_tui
-            Celix::pubsub_serializer_json
-            Celix::pubsub_discovery_etcd
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_udp_multicast
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_discovery_etcd
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_udp_multicast
             celix_pubsub_poi_subscriber
         PROPERTIES
             PSA_UDPMC_VERBOSE=true
@@ -69,10 +69,10 @@ if (BUILD_PUBSUB_PSA_UDP_MC)
             Celix::log_admin
             Celix::shell
             Celix::shell_tui
-            Celix::pubsub_serializer_json
-            Celix::pubsub_discovery_etcd
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_udp_multicast
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_discovery_etcd
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_udp_multicast
             celix_pubsub_poi_subscriber
         PROPERTIES
             PSA_UDPMC_VERBOSE=true
@@ -105,11 +105,11 @@ if (BUILD_PUBSUB_PSA_TCP)
                 Celix::log_admin
                 Celix::shell
                 Celix::shell_tui
-                Celix::pubsub_serializer_json
-                Celix::pubsub_discovery_etcd
-                Celix::pubsub_topology_manager
-                Celix::pubsub_admin_tcp
-                Celix::pubsub_protocol_wire_v2
+                Celix::celix_pubsub_serializer_json
+                Celix::celix_pubsub_discovery_etcd
+                Celix::celix_pubsub_topology_manager
+                Celix::celix_pubsub_admin_tcp
+                Celix::celix_pubsub_protocol_wire_v2
                 celix_pubsub_poi_publisher
                 celix_pubsub_poi_publisher2
             PROPERTIES
@@ -125,11 +125,11 @@ if (BUILD_PUBSUB_PSA_TCP)
                 Celix::log_admin
                 Celix::shell
                 Celix::shell_tui
-                Celix::pubsub_serializer_json
-                Celix::pubsub_discovery_etcd
-                Celix::pubsub_topology_manager
-                Celix::pubsub_admin_tcp
-                Celix::pubsub_protocol_wire_v2
+                Celix::celix_pubsub_serializer_json
+                Celix::celix_pubsub_discovery_etcd
+                Celix::celix_pubsub_topology_manager
+                Celix::celix_pubsub_admin_tcp
+                Celix::celix_pubsub_protocol_wire_v2
                 celix_pubsub_poi_subscriber
             PROPERTIES
                 PSA_TCP_VERBOSE=true
@@ -144,11 +144,11 @@ if (BUILD_PUBSUB_PSA_TCP)
                 Celix::log_admin
                 Celix::shell
                 Celix::shell_tui
-                Celix::pubsub_serializer_json
-                Celix::pubsub_discovery_etcd
-                Celix::pubsub_topology_manager
-                Celix::pubsub_admin_tcp
-                Celix::pubsub_protocol_wire_v2
+                Celix::celix_pubsub_serializer_json
+                Celix::celix_pubsub_discovery_etcd
+                Celix::celix_pubsub_topology_manager
+                Celix::celix_pubsub_admin_tcp
+                Celix::celix_pubsub_protocol_wire_v2
                 celix_pubsub_poi_subscriber
             PROPERTIES
                 PSA_TCP_VERBOSE=true
@@ -186,13 +186,13 @@ if (BUILD_PUBSUB_PSA_ZMQ)
             BUNDLES
                 Celix::shell
                 Celix::shell_tui
-                Celix::pubsub_serializer_json
-                Celix::pubsub_discovery_etcd
-                Celix::pubsub_topology_manager
-                Celix::pubsub_admin_zmq
-                Celix::pubsub_admin_udp_multicast
-                Celix::pubsub_admin_tcp
-                Celix::pubsub_protocol_wire_v1
+                Celix::celix_pubsub_serializer_json
+                Celix::celix_pubsub_discovery_etcd
+                Celix::celix_pubsub_topology_manager
+                Celix::celix_pubsub_admin_zmq
+                Celix::celix_pubsub_admin_udp_multicast
+                Celix::celix_pubsub_admin_tcp
+                Celix::celix_pubsub_protocol_wire_v1
                 celix_pubsub_poi_publisher
                 celix_pubsub_poi_publisher2
         )
@@ -203,13 +203,13 @@ if (BUILD_PUBSUB_PSA_ZMQ)
             BUNDLES
                 Celix::shell
                 Celix::shell_tui
-                Celix::pubsub_serializer_json
-                Celix::pubsub_discovery_etcd
-                Celix::pubsub_topology_manager
-                Celix::pubsub_admin_zmq
-                Celix::pubsub_admin_udp_multicast
-                Celix::pubsub_admin_tcp
-                Celix::pubsub_protocol_wire_v1
+                Celix::celix_pubsub_serializer_json
+                Celix::celix_pubsub_discovery_etcd
+                Celix::celix_pubsub_topology_manager
+                Celix::celix_pubsub_admin_zmq
+                Celix::celix_pubsub_admin_udp_multicast
+                Celix::celix_pubsub_admin_tcp
+                Celix::celix_pubsub_protocol_wire_v1
                 celix_pubsub_poi_subscriber
         )
         target_link_libraries(pubsub_subscriber PRIVATE ${PUBSUB_CONTAINER_LIBS})
@@ -221,10 +221,10 @@ if (BUILD_PUBSUB_PSA_ZMQ)
         BUNDLES
             Celix::shell
             Celix::shell_tui
-            Celix::pubsub_serializer_json
-            Celix::pubsub_discovery_etcd
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_zmq
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_discovery_etcd
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_zmq
             celix_pubsub_poi_publisher
             celix_pubsub_poi_subscriber
         PROPERTIES
@@ -237,11 +237,11 @@ if (BUILD_PUBSUB_PSA_ZMQ)
         BUNDLES
             Celix::shell
             Celix::shell_tui
-            Celix::pubsub_serializer_json
-            Celix::pubsub_protocol_wire_v1
-            Celix::pubsub_discovery_etcd
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_zmq
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_protocol_wire_v1
+            Celix::celix_pubsub_discovery_etcd
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_zmq
             celix_pubsub_poi_publisher
             celix_pubsub_poi_publisher2
             celix_pubsub_interceptors_example
@@ -258,11 +258,11 @@ if (BUILD_PUBSUB_PSA_ZMQ)
         BUNDLES
             Celix::shell
             Celix::shell_tui
-            Celix::pubsub_serializer_json
-            Celix::pubsub_protocol_wire_v1
-            Celix::pubsub_discovery_etcd
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_zmq
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_protocol_wire_v1
+            Celix::celix_pubsub_discovery_etcd
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_zmq
             celix_pubsub_poi_subscriber
             celix_pubsub_interceptors_example
         PROPERTIES
@@ -277,10 +277,10 @@ if (BUILD_PUBSUB_PSA_ZMQ)
         BUNDLES
             Celix::shell
             Celix::shell_tui
-            Celix::pubsub_serializer_json
-            Celix::pubsub_discovery_etcd
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_zmq
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_discovery_etcd
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_zmq
             celix_pubsub_poi_subscriber
         PROPERTIES
             PSA_ZMQ_VERBOSE=true
@@ -326,10 +326,10 @@ if (BUILD_PUBSUB_PSA_NANOMSG)
         BUNDLES
             Celix::shell
             Celix::shell_tui
-            Celix::pubsub_serializer_json
-            Celix::pubsub_discovery_etcd
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_nanomsg
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_discovery_etcd
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_nanomsg
             celix_pubsub_poi_publisher
         PROPERTIES
             PSA_NANOMSG_VERBOSE=true
@@ -343,10 +343,10 @@ if (BUILD_PUBSUB_PSA_NANOMSG)
         BUNDLES
             Celix::shell
             Celix::shell_tui
-            Celix::pubsub_serializer_json
-            Celix::pubsub_discovery_etcd
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_nanomsg
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_discovery_etcd
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_nanomsg
             celix_pubsub_poi_publisher
         PROPERTIES
             PSA_NANOMSG_VERBOSE=true
@@ -360,10 +360,10 @@ if (BUILD_PUBSUB_PSA_NANOMSG)
         BUNDLES
             Celix::shell
             Celix::shell_tui
-            Celix::pubsub_serializer_json
-            Celix::pubsub_discovery_etcd
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_nanomsg
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_discovery_etcd
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_nanomsg
             celix_pubsub_poi_subscriber
         PROPERTIES
             PSA_NANOMSG_VERBOSE=true
@@ -377,10 +377,10 @@ if (BUILD_PUBSUB_PSA_NANOMSG)
         BUNDLES
             Celix::shell
             Celix::shell_tui
-            Celix::pubsub_serializer_json
-            Celix::pubsub_discovery_etcd
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_nanomsg
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_discovery_etcd
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_nanomsg
             celix_pubsub_poi_subscriber
         PROPERTIES
             PSA_NANOMSG_VERBOSE=true
@@ -415,9 +415,9 @@ if (BUILD_PUBSUB_PSA_WS)
             Celix::shell
             Celix::shell_tui
             Celix::http_admin
-            Celix::pubsub_serializer_json
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_websocket
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_websocket
             celix_pubsub_websocket_example
         PROPERTIES
             PSA_TCP_VERBOSE=true
diff --git a/bundles/pubsub/integration/CMakeLists.txt b/bundles/pubsub/integration/CMakeLists.txt
index 9c9ce54..8f21fda 100644
--- a/bundles/pubsub/integration/CMakeLists.txt
+++ b/bundles/pubsub/integration/CMakeLists.txt
@@ -145,9 +145,9 @@ if (BUILD_PUBSUB_PSA_UDP_MC)
                 LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
                 CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
             BUNDLES
-                Celix::pubsub_serializer_json
-                Celix::pubsub_topology_manager
-                Celix::pubsub_admin_udp_multicast
+                Celix::celix_pubsub_serializer_json
+                Celix::celix_pubsub_topology_manager
+                Celix::celix_pubsub_admin_udp_multicast
                 pubsub_sut
                 pubsub_tst
     )
@@ -162,10 +162,10 @@ if (BUILD_PUBSUB_PSA_UDP_MC)
             LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
             CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
             BUNDLES
-            Celix::pubsub_serializer_json
-            Celix::pubsub_protocol_wire_v2
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_udp_multicast
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_protocol_wire_v2
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_udp_multicast
             Celix::shell
             Celix::shell_tui
             )
@@ -199,10 +199,10 @@ if (BUILD_PUBSUB_PSA_TCP)
             BUNDLES
             Celix::shell
             Celix::shell_tui
-            Celix::pubsub_serializer_json
-            Celix::pubsub_protocol_wire_v1
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_tcp
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_protocol_wire_v1
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_tcp
             pubsub_sut
             pubsub_tst
             )
@@ -221,10 +221,10 @@ if (BUILD_PUBSUB_PSA_TCP)
             BUNDLES
             Celix::shell
             Celix::shell_tui
-            Celix::pubsub_serializer_json
-            Celix::pubsub_protocol_wire_v2
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_tcp
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_protocol_wire_v2
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_tcp
             pubsub_sut
             pubsub_tst
             )
@@ -244,10 +244,10 @@ if (BUILD_PUBSUB_PSA_TCP)
         BUNDLES
             Celix::shell
             Celix::shell_tui
-            Celix::pubsub_serializer_json
-            Celix::pubsub_protocol_wire_v2
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_tcp
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_protocol_wire_v2
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_tcp
             pubsub_sut
             pubsub_tst
     )
@@ -266,10 +266,10 @@ if (BUILD_PUBSUB_PSA_TCP)
             BUNDLES
             Celix::shell
             Celix::shell_tui
-            Celix::pubsub_serializer_json
-            Celix::pubsub_protocol_wire_v2
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_tcp
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_protocol_wire_v2
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_tcp
             pubsub_endpoint_tst
             pubsub_endpoint_sut
             pubsub_loopback
@@ -285,10 +285,10 @@ if (BUILD_PUBSUB_PSA_TCP)
             LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
             CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
             BUNDLES
-            Celix::pubsub_serializer_json
-            Celix::pubsub_protocol_wire_v2
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_tcp
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_protocol_wire_v2
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_tcp
             Celix::shell
             Celix::shell_tui
             )
@@ -324,10 +324,10 @@ if (BUILD_PUBSUB_PSA_TCP)
             BUNDLES
             Celix::shell
             Celix::shell_tui
-            Celix::pubsub_serializer_json
-            Celix::pubsub_protocol_wire_v1
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_tcp_v2
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_protocol_wire_v1
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_tcp_v2
             pubsub_sut
             pubsub_tst
             pubsub_serializer
@@ -347,10 +347,10 @@ if (BUILD_PUBSUB_PSA_TCP)
             BUNDLES
             Celix::shell
             Celix::shell_tui
-            Celix::pubsub_serializer_json
-            Celix::pubsub_protocol_wire_v2
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_tcp_v2
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_protocol_wire_v2
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_tcp_v2
             pubsub_sut
             pubsub_tst
             pubsub_serializer
@@ -371,10 +371,10 @@ if (BUILD_PUBSUB_PSA_TCP)
             BUNDLES
             Celix::shell
             Celix::shell_tui
-            Celix::pubsub_serializer_json
-            Celix::pubsub_protocol_wire_v2
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_tcp_v2
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_protocol_wire_v2
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_tcp_v2
             pubsub_sut
             pubsub_tst
             pubsub_serializer
@@ -394,10 +394,10 @@ if (BUILD_PUBSUB_PSA_TCP)
             BUNDLES
             Celix::shell
             Celix::shell_tui
-            Celix::pubsub_serializer_json
-            Celix::pubsub_protocol_wire_v2
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_tcp_v2
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_protocol_wire_v2
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_tcp_v2
             pubsub_endpoint_tst
             pubsub_endpoint_sut
             pubsub_loopback
@@ -414,10 +414,10 @@ if (BUILD_PUBSUB_PSA_TCP)
             LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
             CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
             BUNDLES
-            Celix::pubsub_serializer_json
-            Celix::pubsub_protocol_wire_v2
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_tcp_v2
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_protocol_wire_v2
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_tcp_v2
             Celix::shell
             Celix::shell_tui
             pubsub_serializer
@@ -454,10 +454,10 @@ if (BUILD_PUBSUB_PSA_WS)
             USE_WEBSOCKETS=true
             LISTENING_PORTS=8080
             BUNDLES
-            Celix::pubsub_serializer_json
+            Celix::celix_pubsub_serializer_json
             Celix::http_admin
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_websocket
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_websocket
             pubsub_sut
             pubsub_tst
             )
@@ -474,10 +474,10 @@ if (BUILD_PUBSUB_PSA_WS)
             LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
             CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
             BUNDLES
-            Celix::pubsub_serializer_json
-            Celix::pubsub_protocol_wire_v2
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_websocket
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_protocol_wire_v2
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_websocket
             Celix::shell
             Celix::shell_tui
             )
@@ -505,10 +505,10 @@ if (BUILD_PUBSUB_PSA_WS)
             USE_WEBSOCKETS=true
             LISTENING_PORTS=8080
             BUNDLES
-            Celix::pubsub_serializer_json
+            Celix::celix_pubsub_serializer_json
             Celix::http_admin
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_websocket_v2
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_websocket_v2
             pubsub_sut
             pubsub_tst
             pubsub_serializer
@@ -526,9 +526,9 @@ if (BUILD_PUBSUB_PSA_WS)
             LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
             CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
             BUNDLES
-            Celix::pubsub_serializer_json
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_websocket_v2
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_websocket_v2
             Celix::shell
             Celix::shell_tui
             pubsub_serializer
@@ -560,10 +560,10 @@ if (BUILD_PUBSUB_PSA_ZMQ)
                 LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
                 CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
             BUNDLES
-                Celix::pubsub_serializer_json
-                Celix::pubsub_protocol_wire_v1
-                Celix::pubsub_topology_manager
-                Celix::pubsub_admin_zmq
+                Celix::celix_pubsub_serializer_json
+                Celix::celix_pubsub_protocol_wire_v1
+                Celix::celix_pubsub_topology_manager
+                Celix::celix_pubsub_admin_zmq
                 pubsub_sut
                 pubsub_tst
                 pubsub_serializer
@@ -577,10 +577,10 @@ if (BUILD_PUBSUB_PSA_ZMQ)
                 LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
                 CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
             BUNDLES
-                Celix::pubsub_serializer_json
-                Celix::pubsub_topology_manager
-                Celix::pubsub_admin_zmq_v2
-                Celix::pubsub_protocol_wire_v2
+                Celix::celix_pubsub_serializer_json
+                Celix::celix_pubsub_topology_manager
+                Celix::celix_pubsub_admin_zmq_v2
+                Celix::celix_pubsub_protocol_wire_v2
                 pubsub_sut
                 pubsub_tst
                 pubsub_serializer
@@ -604,10 +604,10 @@ if (BUILD_PUBSUB_PSA_ZMQ)
         LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
         CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
         BUNDLES
-        Celix::pubsub_serializer_json
-        Celix::pubsub_protocol_wire_v2
-        Celix::pubsub_topology_manager
-        Celix::pubsub_admin_zmq
+        Celix::celix_pubsub_serializer_json
+        Celix::celix_pubsub_protocol_wire_v2
+        Celix::celix_pubsub_topology_manager
+        Celix::celix_pubsub_admin_zmq
         pubsub_sut
         pubsub_tst
         pubsub_serializer
@@ -628,10 +628,10 @@ if (BUILD_PUBSUB_PSA_ZMQ)
                 PSA_ZMQ_ZEROCOPY_ENABLED=true
                 CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
             BUNDLES
-                Celix::pubsub_serializer_json
-                Celix::pubsub_protocol_wire_v1
-                Celix::pubsub_topology_manager
-                Celix::pubsub_admin_zmq
+                Celix::celix_pubsub_serializer_json
+                Celix::celix_pubsub_protocol_wire_v1
+                Celix::celix_pubsub_topology_manager
+                Celix::celix_pubsub_admin_zmq
                 Celix::shell
                 Celix::shell_tui
                 pubsub_sut
@@ -648,10 +648,10 @@ if (BUILD_PUBSUB_PSA_ZMQ)
                 PSA_ZMQ_ZEROCOPY_ENABLED=true
                 CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
             BUNDLES
-                Celix::pubsub_serializer_json
-                Celix::pubsub_topology_manager
-                Celix::pubsub_admin_zmq_v2
-                Celix::pubsub_protocol_wire_v2
+                Celix::celix_pubsub_serializer_json
+                Celix::celix_pubsub_topology_manager
+                Celix::celix_pubsub_admin_zmq_v2
+                Celix::celix_pubsub_protocol_wire_v2
                 Celix::shell
                 Celix::shell_tui
                 pubsub_sut
@@ -679,10 +679,10 @@ if (BUILD_PUBSUB_PSA_ZMQ)
         PSA_ZMQ_ZEROCOPY_ENABLED=true
         CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
         BUNDLES
-        Celix::pubsub_serializer_json
-        Celix::pubsub_protocol_wire_v2
-        Celix::pubsub_topology_manager
-        Celix::pubsub_admin_zmq
+        Celix::celix_pubsub_serializer_json
+        Celix::celix_pubsub_protocol_wire_v2
+        Celix::celix_pubsub_topology_manager
+        Celix::celix_pubsub_admin_zmq
         Celix::shell
         Celix::shell_tui
         pubsub_sut
@@ -703,10 +703,10 @@ if (BUILD_PUBSUB_PSA_ZMQ)
             LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
             CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
             BUNDLES
-            Celix::pubsub_serializer_json
-            Celix::pubsub_protocol_wire_v2
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_zmq
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_protocol_wire_v2
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_zmq
             Celix::shell
             Celix::shell_tui
             )
@@ -720,10 +720,10 @@ if (BUILD_PUBSUB_PSA_ZMQ)
             LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
             CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
             BUNDLES
-            Celix::pubsub_serializer_json
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_zmq_v2
-            Celix::pubsub_protocol_wire_v2
+            Celix::celix_pubsub_serializer_json
+            Celix::celix_pubsub_topology_manager
+            Celix::celix_pubsub_admin_zmq_v2
+            Celix::celix_pubsub_protocol_wire_v2
             )
     target_compile_definitions(pstm_deadlock_zmq_test PRIVATE -DDEADLOCK_SUT_BUNDLE_FILE=\"${DEADLOCK_SUT_BUNDLE_FILE}\")
     target_link_libraries(pstm_deadlock_zmq_test PRIVATE Celix::pubsub_api GTest::gtest GTest::gtest_main Celix::dfi ZMQ::lib CZMQ::lib)
@@ -763,10 +763,10 @@ if (BUILD_PUBSUB_PSA_ZMQ)
     setup_target_for_coverage(test_pubsub_topic_and_scope_integration SCAN_DIR ..)
 
     #configure topology manager and pubsub zmq, json serializer and wire protocol v2 bundles
-    celix_get_bundle_file(Celix::pubsub_serializer_json PUBSUB_JSON_BUNDLE_FILE)
-    celix_get_bundle_file(Celix::pubsub_topology_manager PUBSUB_TOPMAN_BUNDLE_FILE)
-    celix_get_bundle_file(Celix::pubsub_admin_zmq_v2 PUBSUB_ZMQ_BUNDLE_FILE)
-    celix_get_bundle_file(Celix::pubsub_protocol_wire_v2 PUBSUB_WIRE_BUNDLE_FILE)
+    celix_get_bundle_file(Celix::celix_pubsub_serializer_json PUBSUB_JSON_BUNDLE_FILE)
+    celix_get_bundle_file(Celix::celix_pubsub_topology_manager PUBSUB_TOPMAN_BUNDLE_FILE)
+    celix_get_bundle_file(Celix::celix_pubsub_admin_zmq_v2 PUBSUB_ZMQ_BUNDLE_FILE)
+    celix_get_bundle_file(Celix::celix_pubsub_protocol_wire_v2 PUBSUB_WIRE_BUNDLE_FILE)
     add_dependencies(test_pubsub_topic_and_scope_integration
             celix_pubsub_serializer_json_bundle
             celix_pubsub_topology_manager_bundle
@@ -791,13 +791,13 @@ if (BUILD_PUBSUB_PSA_ZMQ)
     setup_target_for_coverage(test_pubsub_interceptors_integration SCAN_DIR ..)
 
     #configure topology manager and pubsub zmq, json serializer and wire protocol v2 bundles
-    celix_get_bundle_file(Celix::pubsub_serializer_json PUBSUB_JSON_BUNDLE_FILE)
-    celix_get_bundle_file(Celix::pubsub_topology_manager PUBSUB_TOPMAN_BUNDLE_FILE)
-    celix_get_bundle_file(Celix::pubsub_admin_zmq_v2 PUBSUB_ZMQ_BUNDLE_FILE)
-    celix_get_bundle_file(Celix::pubsub_protocol_wire_v1 PUBSUB_WIRE_BUNDLE_FILE)
+    celix_get_bundle_file(Celix::celix_pubsub_serializer_json PUBSUB_JSON_BUNDLE_FILE)
+    celix_get_bundle_file(Celix::celix_pubsub_topology_manager PUBSUB_TOPMAN_BUNDLE_FILE)
+    celix_get_bundle_file(Celix::celix_pubsub_admin_zmq_v2 PUBSUB_ZMQ_BUNDLE_FILE)
+    celix_get_bundle_file(Celix::celix_pubsub_protocol_wire_v1 PUBSUB_WIRE_BUNDLE_FILE)
     celix_get_bundle_file(pubsub_sut PUBSUB_PUBLISHER_BUNDLE_FILE)
     celix_get_bundle_file(pubsub_tst PUBSUB_SUBSCRIBER_BUNDLE_FILE)
-    add_celix_bundle_dependencies(test_pubsub_interceptors_integration Celix::pubsub_serializer_json Celix::pubsub_topology_manager Celix::pubsub_admin_zmq_v2 Celix::pubsub_protocol_wire_v1 pubsub_sut pubsub_tst)
+    add_celix_bundle_dependencies(test_pubsub_interceptors_integration Celix::celix_pubsub_serializer_json Celix::celix_pubsub_topology_manager Celix::celix_pubsub_admin_zmq_v2 Celix::celix_pubsub_protocol_wire_v1 pubsub_sut pubsub_tst)
     target_compile_definitions(test_pubsub_interceptors_integration PRIVATE
             PUBSUB_JSON_BUNDLE_FILE="${PUBSUB_JSON_BUNDLE_FILE}"
             PUBSUB_TOPMAN_BUNDLE_FILE="${PUBSUB_TOPMAN_BUNDLE_FILE}"
diff --git a/bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt b/bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt
index 2f1bca8..8039b77 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-message(WARNING "PubSub TCP Admin V1 is deprecated, use PubSub TCP Websocket v2 instead")
+message(STATUS "PubSub TCP Admin V1 is deprecated, and will eventually be replaced with PubSub TCP Admin v2")
 
 find_package(UUID REQUIRED)
 
@@ -43,4 +43,4 @@ endif()
 
 install_celix_bundle(celix_pubsub_admin_tcp_v1 EXPORT celix COMPONENT pubsub)
 target_link_libraries(celix_pubsub_admin_tcp_v1 PRIVATE Celix::shell_api)
-add_library(Celix::pubsub_admin_tcp_v1 ALIAS celix_pubsub_admin_tcp_v1)
+add_library(Celix::celix_pubsub_admin_tcp_v1 ALIAS celix_pubsub_admin_tcp_v1)
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/CMakeLists.txt b/bundles/pubsub/pubsub_admin_tcp/v2/CMakeLists.txt
index 117e85a..fce1a42 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/CMakeLists.txt
@@ -41,4 +41,4 @@ endif()
 
 install_celix_bundle(celix_pubsub_admin_tcp_v2 EXPORT celix COMPONENT pubsub)
 target_link_libraries(celix_pubsub_admin_tcp_v2 PRIVATE Celix::shell_api)
-add_library(Celix::pubsub_admin_tcp_v2 ALIAS celix_pubsub_admin_tcp_v2)
+add_library(Celix::celix_pubsub_admin_tcp_v2 ALIAS celix_pubsub_admin_tcp_v2)
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt b/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
index 910c7dd..4cf8fef 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
@@ -37,6 +37,6 @@ target_link_libraries(celix_pubsub_admin_udp_multicast PRIVATE Celix::framework
 target_link_libraries(celix_pubsub_admin_udp_multicast PRIVATE Celix::pubsub_spi Celix::pubsub_utils )
 install_celix_bundle(celix_pubsub_admin_udp_multicast EXPORT celix COMPONENT pubsub)
 
-add_library(Celix::pubsub_admin_udp_multicast ALIAS celix_pubsub_admin_udp_multicast)
+add_library(Celix::celix_pubsub_admin_udp_multicast ALIAS celix_pubsub_admin_udp_multicast)
 
 
diff --git a/bundles/pubsub/pubsub_admin_websocket/v1/CMakeLists.txt b/bundles/pubsub/pubsub_admin_websocket/v1/CMakeLists.txt
index 021310f..4cff18f 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v1/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_websocket/v1/CMakeLists.txt
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-message(WARNING "PubSub Websocket Admin V1 is deprecated, use PubSub ZMQ Websocket v2 instead")
+message(STATUS "PubSub Websocket Admin V1 is deprecated, and will eventually be replaced with PubSub Websocket Admin v2")
 
 find_package(Jansson REQUIRED)
 find_package(UUID REQUIRED)
@@ -44,4 +44,4 @@ target_include_directories(celix_pubsub_admin_websocket_v1 PRIVATE
 
 install_celix_bundle(celix_pubsub_admin_websocket_v1 EXPORT celix COMPONENT pubsub)
 target_link_libraries(celix_pubsub_admin_websocket_v1 PRIVATE Celix::shell_api)
-add_library(Celix::pubsub_admin_websocket_v1 ALIAS celix_pubsub_admin_websocket_v1)
+add_library(Celix::celix_pubsub_admin_websocket_v1 ALIAS celix_pubsub_admin_websocket_v1)
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/CMakeLists.txt b/bundles/pubsub/pubsub_admin_websocket/v2/CMakeLists.txt
index 6557ddb..966adc1 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/CMakeLists.txt
@@ -42,4 +42,4 @@ target_include_directories(celix_pubsub_admin_websocket_v2 PRIVATE
 
 install_celix_bundle(celix_pubsub_admin_websocket_v2 EXPORT celix COMPONENT pubsub)
 target_link_libraries(celix_pubsub_admin_websocket_v2 PRIVATE Celix::shell_api)
-add_library(Celix::pubsub_admin_websocket_v2 ALIAS celix_pubsub_admin_websocket_v2)
+add_library(Celix::celix_pubsub_admin_websocket_v2 ALIAS celix_pubsub_admin_websocket_v2)
diff --git a/bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt b/bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt
index bfd8eef..37ea629 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-message(WARNING "PubSub ZMQ Admin V1 is deprecated, use PubSub ZMQ Admin v2 instead")
+message(STATUS "PubSub ZMQ Admin V1 is deprecated, and will eventually be replaced with PubSub ZMQ Admin v2")
 
 find_package(ZMQ REQUIRED)
 find_package(CZMQ REQUIRED)
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/CMakeLists.txt b/bundles/pubsub/pubsub_admin_zmq/v2/CMakeLists.txt
index 02340a6..b227df7 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/CMakeLists.txt
@@ -55,4 +55,4 @@ target_include_directories(celix_pubsub_admin_zmq_v2 PRIVATE
 
 install_celix_bundle(celix_pubsub_admin_zmq_v2 EXPORT celix COMPONENT pubsub)
 target_link_libraries(celix_pubsub_admin_zmq_v2 PRIVATE Celix::shell_api)
-add_library(Celix::pubsub_admin_zmq_v2 ALIAS celix_pubsub_admin_zmq_v2)
+add_library(Celix::celix_pubsub_admin_zmq_v2 ALIAS celix_pubsub_admin_zmq_v2)
diff --git a/bundles/pubsub/pubsub_discovery/CMakeLists.txt b/bundles/pubsub/pubsub_discovery/CMakeLists.txt
index 73235e6..8dab81c 100644
--- a/bundles/pubsub/pubsub_discovery/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_discovery/CMakeLists.txt
@@ -36,4 +36,4 @@ target_link_libraries(celix_pubsub_discovery_etcd PRIVATE Celix::pubsub_spi Celi
 
 install_celix_bundle(celix_pubsub_discovery_etcd EXPORT celix COMPONENT pubsub)
 
-add_library(Celix::pubsub_discovery_etcd ALIAS celix_pubsub_discovery_etcd)
+add_library(Celix::celix_pubsub_discovery_etcd ALIAS celix_pubsub_discovery_etcd)
diff --git a/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/CMakeLists.txt b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/CMakeLists.txt
index d8a1ef8..6cbef5b 100644
--- a/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/CMakeLists.txt
@@ -35,7 +35,7 @@ target_link_libraries(celix_pubsub_protocol_wire_v1 PRIVATE celix_wire_protocol_
 
 install_celix_bundle(celix_pubsub_protocol_wire_v1 EXPORT celix COMPONENT pubsub)
 
-add_library(Celix::pubsub_protocol_wire_v1 ALIAS celix_pubsub_protocol_wire_v1)
+add_library(Celix::celix_pubsub_protocol_wire_v1 ALIAS celix_pubsub_protocol_wire_v1)
 
 if (ENABLE_TESTING)
     add_subdirectory(gtest)
diff --git a/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/CMakeLists.txt b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/CMakeLists.txt
index 61324e2..450aa65 100644
--- a/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/CMakeLists.txt
@@ -35,7 +35,7 @@ target_link_libraries(celix_pubsub_protocol_wire_v2 PRIVATE celix_wire_protocol_
 
 install_celix_bundle(celix_pubsub_protocol_wire_v2 EXPORT celix COMPONENT pubsub)
 
-add_library(Celix::pubsub_protocol_wire_v2 ALIAS celix_pubsub_protocol_wire_v2)
+add_library(Celix::celix_pubsub_protocol_wire_v2 ALIAS celix_pubsub_protocol_wire_v2)
 
 if (ENABLE_TESTING)
     add_subdirectory(gtest)
diff --git a/bundles/pubsub/pubsub_serializer_avrobin/CMakeLists.txt b/bundles/pubsub/pubsub_serializer_avrobin/CMakeLists.txt
index 1352e82..9b57bc4 100644
--- a/bundles/pubsub/pubsub_serializer_avrobin/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_serializer_avrobin/CMakeLists.txt
@@ -36,7 +36,7 @@ target_link_libraries(celix_pubsub_serializer_avrobin PRIVATE Celix::pubsub_spi
 
 install_celix_bundle(celix_pubsub_serializer_avrobin EXPORT celix COMPONENT pubsub)
 
-add_library(Celix::pubsub_serializer_avrobin ALIAS celix_pubsub_serializer_avrobin)
+add_library(Celix::celix_pubsub_serializer_avrobin ALIAS celix_pubsub_serializer_avrobin)
 
 if (ENABLE_TESTING)
     add_subdirectory(gtest)
diff --git a/bundles/pubsub/pubsub_serializer_json/CMakeLists.txt b/bundles/pubsub/pubsub_serializer_json/CMakeLists.txt
index d1e9eb2..2b295f1 100644
--- a/bundles/pubsub/pubsub_serializer_json/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_serializer_json/CMakeLists.txt
@@ -36,7 +36,7 @@ target_link_libraries(celix_pubsub_serializer_json PRIVATE Celix::pubsub_spi Cel
 
 install_celix_bundle(celix_pubsub_serializer_json EXPORT celix COMPONENT pubsub)
 
-add_library(Celix::pubsub_serializer_json ALIAS celix_pubsub_serializer_json)
+add_library(Celix::celix_pubsub_serializer_json ALIAS celix_pubsub_serializer_json)
 
 
 if (ENABLE_TESTING)
diff --git a/bundles/pubsub/pubsub_topology_manager/CMakeLists.txt b/bundles/pubsub/pubsub_topology_manager/CMakeLists.txt
index e68dac9..98a4de0 100644
--- a/bundles/pubsub/pubsub_topology_manager/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_topology_manager/CMakeLists.txt
@@ -32,5 +32,5 @@ target_link_libraries(celix_pubsub_topology_manager PRIVATE UUID::lib)
 
 install_celix_bundle(celix_pubsub_topology_manager EXPORT celix COMPONENT pubsub)
 
-add_library(Celix::pubsub_topology_manager ALIAS celix_pubsub_topology_manager)
+add_library(Celix::celix_pubsub_topology_manager ALIAS celix_pubsub_topology_manager)
 
diff --git a/bundles/shell/remote_shell/CMakeLists.txt b/bundles/shell/remote_shell/CMakeLists.txt
index bdad5c9..92a75ad 100644
--- a/bundles/shell/remote_shell/CMakeLists.txt
+++ b/bundles/shell/remote_shell/CMakeLists.txt
@@ -21,6 +21,7 @@ if (REMOTE_SHELL)
      	SYMBOLIC_NAME "apache_celix_remote_shell"
      	VERSION "0.0.2"
      	NAME "Apache Celix Remote Shell"
+		FILENAME celix_remote_shell
 		GROUP "Celix/Shell"
 		SOURCES
 			src/activator
diff --git a/bundles/shell/shell/CMakeLists.txt b/bundles/shell/shell/CMakeLists.txt
index 0fb5bfb..c89e85a 100644
--- a/bundles/shell/shell/CMakeLists.txt
+++ b/bundles/shell/shell/CMakeLists.txt
@@ -51,6 +51,7 @@ if (SHELL)
         SYMBOLIC_NAME "apache_celix_c_shell"
         VERSION "2.1.0"
         NAME "Apache Celix C Shell"
+		FILENAME celix_shell
 		GROUP "Celix/Shell"
         SOURCES
 			src/c_shell_activator.c
@@ -73,6 +74,7 @@ if (SHELL)
 			SYMBOLIC_NAME "apache::celix::CxxShell"
 			VERSION "2.1.0"
 			NAME "Apache Celix CXX Shell"
+			FILENAME celix_ShellCxx
 			GROUP "Celix/Shell"
 			SOURCES
 				src/Shell.cc
diff --git a/bundles/shell/shell_bonjour/CMakeLists.txt b/bundles/shell/shell_bonjour/CMakeLists.txt
index 771ccdb..e3b0889 100644
--- a/bundles/shell/shell_bonjour/CMakeLists.txt
+++ b/bundles/shell/shell_bonjour/CMakeLists.txt
@@ -28,6 +28,7 @@ if (SHELL_BONJOUR)
 
 	add_celix_bundle(bonjour_shell
 		VERSION "1.0.0"
+		FILENAME celix_bonjour_shell
 		GROUP "Celix/Shell"
 		SOURCES
 		 	private/src/activator.c
diff --git a/bundles/shell/shell_tui/CMakeLists.txt b/bundles/shell/shell_tui/CMakeLists.txt
index 193cb8e..909d5df 100644
--- a/bundles/shell/shell_tui/CMakeLists.txt
+++ b/bundles/shell/shell_tui/CMakeLists.txt
@@ -21,6 +21,7 @@ if (SHELL_TUI)
     	SYMBOLIC_NAME "apache_celix_shell_tui"
     	VERSION "1.1.0"
     	NAME "Apache Celix Shell TUI"
+		FILENAME celix_shell_tui
 		GROUP "Celix/Shell"
     	SOURCES 
     		private/src/activator 
diff --git a/bundles/shell/shell_wui/CMakeLists.txt b/bundles/shell/shell_wui/CMakeLists.txt
index 50bdf0d..9ebc25c 100644
--- a/bundles/shell/shell_wui/CMakeLists.txt
+++ b/bundles/shell/shell_wui/CMakeLists.txt
@@ -22,6 +22,7 @@ if (SHELL_WUI)
         SYMBOLIC_NAME "apache_celix_shell_wui"
         VERSION "1.0.0"
         NAME "Apache Celix Shell WUI"
+        FILENAME celix_shell_wui
         GROUP "Celix/Shell"
         SOURCES
             src/shell_wui_bundle_activator.c
diff --git a/cmake/AddGTest.cmake b/cmake/AddGTest.cmake
index c57d57b..d7719ff 100644
--- a/cmake/AddGTest.cmake
+++ b/cmake/AddGTest.cmake
@@ -19,7 +19,7 @@ include(FetchContent)
 FetchContent_Declare(
         googletest
         GIT_REPOSITORY https://github.com/google/googletest.git
-        GIT_TAG        release-1.10.0
+        GIT_TAG        release-1.11.0
 )
 FetchContent_MakeAvailable(googletest)
 
diff --git a/cmake/CelixConfig.cmake b/cmake/CelixConfig.cmake
index 6c7acb4..0c7caf3 100644
--- a/cmake/CelixConfig.cmake
+++ b/cmake/CelixConfig.cmake
@@ -68,6 +68,17 @@ if (TARGET Celix::dependency_manager_cxx)
   set(CELIX_DM_CXX_STATIC_LIB $<TARGET_PROPERTY:Celix::dependency_manager_cxx,INTERFACE_INCLUDE_DIRECTORIES>)
 endif ()
 
+#Setting up pubsub admin alias to the v1 version.
+if (TARGET Celix::pubsub_admin_zmq_v1)
+  add_library(Celix::celix_pubsub_admin_zmq ALIAS Celix::pubsub_admin_zmq_v1)
+endif ()
+if (TARGET Celix::pubsub_admin_tcp_v1)
+  add_library(Celix::celix_pubsub_admin_tcp ALIAS Celix::pubsub_admin_tcp_v1)
+endif ()
+if (TARGET Celix::pubsub_admin_websocket_v1)
+  add_library(Celix::celix_pubsub_admin_websocket ALIAS Celix::pubsub_admin_websocket_v1)
+endif ()
+
 set(CELIX_BUNDLES_DIR ${REL_INSTALL_DIR}/share/celix/bundles)
 set(CELIX_SHELL_BUNDLE ${CELIX_BUNDLES_DIR}/shell.zip)
 set(CELIX_SHELL_TUI_BUNDLE ${CELIX_BUNDLES_DIR}/shell_tui.zip)
diff --git a/cmake/cmake_celix/BundlePackaging.cmake b/cmake/cmake_celix/BundlePackaging.cmake
index d137046..57004c8 100644
--- a/cmake/cmake_celix/BundlePackaging.cmake
+++ b/cmake/cmake_celix/BundlePackaging.cmake
@@ -734,6 +734,58 @@ function(celix_bundle_description BUNDLE DESC)
     set_target_properties(${BUNDLE} PROPERTIES "BUNDLE_DESCRIPTION" ${DESC})
 endfunction()
 
+#[[
+Get bundle filename from an (imported) bundle target taking into account the
+used CMAKE_BUILD_TYPE and available bundle configurations.
+
+celix_get_bundle_filename(<bundle_target> VARIABLE_NAME)
+
+Example: celix_get_bundle_filename(Celix::shell SHELL_BUNDLE_FILENAME)
+
+]]
+function(celix_get_bundle_filename)
+    if (TARGET ${ARGV0})
+        get_target_property(_IMP ${ARGV0} BUNDLE_IMPORTED)
+        if (_IMP)
+            _celix_extract_imported_bundle_info(${ARGV0})
+            set(${ARGV1} ${BUNDLE_FILENAME} PARENT_SCOPE)
+        else ()
+            get_target_property(BF ${ARGV0} BUNDLE_FILENAME)
+            set(${ARGV1} ${BF} PARENT_SCOPE)
+        endif ()
+    else ()
+        message(FATAL_ERROR "Provided argument is not a CMake target: ${ARGV0}")
+    endif ()
+endfunction ()
+
+#[[
+Get bundle file (absolute path to a bundle) from an (imported) bundle
+target taking into account the used CMAKE_BUILD_TYPE and available
+bundle configurations.
+
+celix_get_bundle_file(<bundle_target> VARIABLE_NAME)
+
+Example: celix_get_bundle_file(Celix::shell SHELL_BUNDLE_FILE)
+
+]]
+function(celix_get_bundle_file)
+    if (TARGET ${ARGV0})
+        get_target_property(_IMP ${ARGV0} BUNDLE_IMPORTED)
+        if (_IMP)
+            _celix_extract_imported_bundle_info(${ARGV0})
+            set(${ARGV1} ${BUNDLE_FILE} PARENT_SCOPE)
+            unset(BUNDLE_FILE)
+            unset(BUNDLE_FILENAME)
+        else ()
+            get_target_property(BF ${ARGV0} BUNDLE_FILE)
+            set(${ARGV1} ${BF} PARENT_SCOPE)
+        endif ()
+    else ()
+        message(FATAL_ERROR "Provided argument is not a CMake target: ${ARGV0}")
+    endif ()
+endfunction ()
+
+
 function(install_bundle)
     message(DEPRECATION "install_bundle is deprecated, use install_celix_bundle instead.")
     install_celix_bundle(${ARGN})
@@ -930,63 +982,6 @@ endforeach()
     endif ()
 endfunction()
 
-#[[
-Get bundle file (absolute path to bundle) from an (imported) bundle
-target taking into account the used CMAKE_BUILD_TYPE and available
-bundle configurations.
-
-celix_get_bundle_file(<bundle_target> VARIABLE_NAME)
-
-Example: celix_get_bundle_file(Celix::shell SHELL_BUNDLE_FILE)
-
-]]
-function(celix_get_bundle_file)
-
-if (TARGET ${ARGV0})
-    get_target_property(_IMP ${ARGV0} BUNDLE_IMPORTED)
-    if (_IMP)
-        _celix_extract_imported_bundle_info(${ARGV0})
-        set(${ARGV1} ${BUNDLE_FILE} PARENT_SCOPE)
-        unset(BUNDLE_FILE)
-        unset(BUNDLE_FILENAME)
-    else ()
-        get_target_property(BF ${ARGV0} BUNDLE_FILE)
-        set(${ARGV1} ${BF} PARENT_SCOPE)
-    endif ()
-else ()
-    message(FATAL_ERROR "Provided argument is not a CMake target: ${ARGV0}")
-endif ()
-
-endfunction ()
-
-#[[
-Get bundle filename from an (imported) bundle target taking into account the
-used CMAKE_BUILD_TYPE and available bundle configurations.
-
-celix_get_bundle_filename(<bundle_target> VARIABLE_NAME)
-
-Example: celix_get_bundle_filename(Celix::shell SHELL_BUNDLE_FILENAME)
-
-]]
-function(celix_get_bundle_filename)
-
-if (TARGET ${ARGV0})
-    get_target_property(_IMP ${ARGV0} BUNDLE_IMPORTED)
-    if (_IMP)
-        _celix_extract_imported_bundle_info(${ARGV0})
-        set(${ARGV1} ${BUNDLE_FILENAME} PARENT_SCOPE)
-    else ()
-        get_target_property(BF ${ARGV0} BUNDLE_FILENAME)
-        set(${ARGV1} ${BF} PARENT_SCOPE)
-    endif ()
-else ()
-    message(FATAL_ERROR "Provided argument is not a CMake target: ${ARGV0}")
-endif ()
-
-endfunction ()
-
-
-
 
 ######################################### "Private" function ###########################################################
 
diff --git a/cmake/cmake_celix/Generic.cmake b/cmake/cmake_celix/Generic.cmake
index e44d3fa..1d13aca 100644
--- a/cmake/cmake_celix/Generic.cmake
+++ b/cmake/cmake_celix/Generic.cmake
@@ -15,13 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
-function(install_celix_targets)
-    install_celix_bundle_targets(${ARGN})
-endfunction ()
-
 #[[
-Add bundles as dependencies to a cmake target, so that the bundle zip files are created before the cmake target is
- created.
+Add bundles as dependencies to a cmake target, so that the bundle zip files will be created before the cmake target.
 
 add_celix_bundle_dependencies(<cmake_target>
     bundles...
@@ -41,4 +36,8 @@ function(add_celix_bundle_dependencies)
             endif ()
         endif()
     endforeach()
-endfunction()
\ No newline at end of file
+endfunction()
+
+function(install_celix_targets)
+    install_celix_bundle_targets(${ARGN})
+endfunction ()
\ No newline at end of file
diff --git a/documents/cmake_commands/README.md b/documents/cmake_commands/README.md
index 92ce35d..6f9a398 100644
--- a/documents/cmake_commands/README.md
+++ b/documents/cmake_commands/README.md
@@ -163,6 +163,34 @@ Set bundle group.
 celix_bundle_group(<bundle_target> bundle group)
 ```
 
+## celix_get_bundle_filename
+Get bundle filename from an (imported) bundle target taking into account the
+used CMAKE_BUILD_TYPE and available bundle configurations. 
+
+```CMake
+celix_get_bundle_filename(<bundle_target> VARIABLE_NAME)
+```
+
+Example: `celix_get_bundle_filename(Celix::shell SHELL_BUNDLE_FILENAME)` will result in `celix_shell.zip` for a `RelWithDebInfo` cmake build type and in `celix_shell-Debug.zip` for a `Debug` cmake build type (if the a debug bundle version exists). 
+
+## celix_get_bundle_file
+Get bundle file (absolute path to a bundle) from an (imported) bundle target taking into account the used CMAKE_BUILD_TYPE and available bundle configurations.
+
+```CMake
+celix_get_bundle_file(<bundle_target> VARIABLE_NAME)
+```
+
+Example: `celix_get_bundle_file(Celix::shell SHELL_BUNDLE_FILE)`
+
+## add_celix_bundle_dependencies
+Add bundles as dependencies to a cmake target, so that the bundle zip files will be created before the cmake target.
+
+```CMake
+add_celix_bundle_dependencies(<cmake_target>
+    bundles...
+)
+```
+
 ## install_celix_bundle
 Install bundle when 'make install' is executed. 
 Bundles are installed at `<install-prefix>/share/<project_name>/bundles`.

[celix] 05/05: Adds some additional pubsub interceptor integration tests

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git

commit f0cf24484d7fe0902f8bf49081576224048d100c
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Mon Jul 26 20:04:09 2021 +0200

    Adds some additional pubsub interceptor integration tests
---
 bundles/pubsub/integration/CMakeLists.txt          |  66 ++++--
 .../gtest/PubSubInterceptorTestSuite.cc            | 245 +++++++++++++++------
 .../v2/src/pubsub_tcp_topic_receiver.c             |  12 +-
 .../v2/src/pubsub_tcp_topic_sender.c               |   1 +
 .../v2/src/pubsub_websocket_topic_receiver.c       |  11 +-
 .../v2/src/pubsub_websocket_topic_sender.c         |   2 +
 .../v2/src/pubsub_zmq_topic_receiver.c             |  13 +-
 .../v2/src/pubsub_zmq_topic_sender.c               |   5 +-
 8 files changed, 264 insertions(+), 91 deletions(-)

diff --git a/bundles/pubsub/integration/CMakeLists.txt b/bundles/pubsub/integration/CMakeLists.txt
index 8f21fda..eaefde9 100644
--- a/bundles/pubsub/integration/CMakeLists.txt
+++ b/bundles/pubsub/integration/CMakeLists.txt
@@ -760,6 +760,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
             gtest/PubSubTopicAndScopeIntegrationTestSuite.cc
     )
     target_link_libraries(test_pubsub_topic_and_scope_integration PRIVATE Celix::framework Celix::pubsub_api GTest::gtest GTest::gtest_main)
+    add_test(NAME test_pubsub_topic_and_scope_integration COMMAND test_pubsub_topic_and_scope_integration)
     setup_target_for_coverage(test_pubsub_topic_and_scope_integration SCAN_DIR ..)
 
     #configure topology manager and pubsub zmq, json serializer and wire protocol v2 bundles
@@ -781,29 +782,68 @@ if (BUILD_PUBSUB_PSA_ZMQ)
     )
 endif ()
 
-if (BUILD_PUBSUB_PSA_ZMQ)
+#[[
+Add a integration test with use interceptors for a configurable PSA and wire protocol
+
+ARGV0 is test target name
+ARGV1 is PSA target name
+ARGV2 is wire protocol target name
+
+]]
+function(add_celix_interceptors_test_for_psa_and_wire)
+    set(TEST_TARGET_NAME ${ARGV0})
+    set(PSA ${ARGV1})
+    set(WIRE ${ARGV2})
+
     #Test suite to test if pusbub interceptors
-    add_executable(test_pubsub_interceptors_integration
-            gtest/PubSubInterceptorTestSuite.cc
-        )
-    target_link_libraries(test_pubsub_interceptors_integration PRIVATE Celix::framework Celix::pubsub_api GTest::gtest GTest::gtest_main Celix::pubsub_spi)
-    target_include_directories(test_pubsub_interceptors_integration PRIVATE src)
-    setup_target_for_coverage(test_pubsub_interceptors_integration SCAN_DIR ..)
+    add_executable(${TEST_TARGET_NAME}
+        gtest/PubSubInterceptorTestSuite.cc
+    )
+    target_link_libraries(${TEST_TARGET_NAME} PRIVATE Celix::framework Celix::pubsub_api GTest::gtest GTest::gtest_main Celix::pubsub_spi)
+    target_include_directories(${TEST_TARGET_NAME} PRIVATE src)
+    add_test(NAME ${TEST_TARGET_NAME} COMMAND ${TEST_TARGET_NAME})
+    setup_target_for_coverage(${TEST_TARGET_NAME} SCAN_DIR ..)
 
-    #configure topology manager and pubsub zmq, json serializer and wire protocol v2 bundles
+    #configure topology manager and pubsub admin, json serializer and wire protocol bundles
     celix_get_bundle_file(Celix::celix_pubsub_serializer_json PUBSUB_JSON_BUNDLE_FILE)
     celix_get_bundle_file(Celix::celix_pubsub_topology_manager PUBSUB_TOPMAN_BUNDLE_FILE)
-    celix_get_bundle_file(Celix::celix_pubsub_admin_zmq_v2 PUBSUB_ZMQ_BUNDLE_FILE)
-    celix_get_bundle_file(Celix::celix_pubsub_protocol_wire_v1 PUBSUB_WIRE_BUNDLE_FILE)
+    celix_get_bundle_file(${PSA} PUBSUB_PSA_BUNDLE_FILE)
+    celix_get_bundle_file(${WIRE} PUBSUB_WIRE_BUNDLE_FILE)
+
     celix_get_bundle_file(pubsub_sut PUBSUB_PUBLISHER_BUNDLE_FILE)
     celix_get_bundle_file(pubsub_tst PUBSUB_SUBSCRIBER_BUNDLE_FILE)
-    add_celix_bundle_dependencies(test_pubsub_interceptors_integration Celix::celix_pubsub_serializer_json Celix::celix_pubsub_topology_manager Celix::celix_pubsub_admin_zmq_v2 Celix::celix_pubsub_protocol_wire_v1 pubsub_sut pubsub_tst)
-    target_compile_definitions(test_pubsub_interceptors_integration PRIVATE
+    add_celix_bundle_dependencies(${TEST_TARGET_NAME} Celix::celix_pubsub_serializer_json Celix::celix_pubsub_topology_manager ${PSA} ${WIRE} pubsub_sut pubsub_tst)
+    target_compile_definitions(${TEST_TARGET_NAME} PRIVATE
             PUBSUB_JSON_BUNDLE_FILE="${PUBSUB_JSON_BUNDLE_FILE}"
             PUBSUB_TOPMAN_BUNDLE_FILE="${PUBSUB_TOPMAN_BUNDLE_FILE}"
-            PUBSUB_ZMQ_BUNDLE_FILE="${PUBSUB_ZMQ_BUNDLE_FILE}"
+            PUBSUB_PSA_BUNDLE_FILE="${PUBSUB_PSA_BUNDLE_FILE}"
             PUBSUB_WIRE_BUNDLE_FILE="${PUBSUB_WIRE_BUNDLE_FILE}"
             PUBSUB_PUBLISHER_BUNDLE_FILE="${PUBSUB_PUBLISHER_BUNDLE_FILE}"
             PUBSUB_SUBSCRIBER_BUNDLE_FILE="${PUBSUB_SUBSCRIBER_BUNDLE_FILE}"
     )
+
+    #if PSA websocket is enabled add http_admin bundle
+    if (BUILD_PUBSUB_PSA_WS)
+        target_link_libraries(${TEST_TARGET_NAME} PRIVATE Celix::http_admin_api)
+        celix_get_bundle_file(Celix::http_admin HTTP_ADMIN_BUNDLE_FILE)
+        add_celix_bundle_dependencies(${TEST_TARGET_NAME} Celix::http_admin)
+        target_compile_definitions(${TEST_TARGET_NAME} PRIVATE HTTP_ADMIN_BUNDLE_FILE="${HTTP_ADMIN_BUNDLE_FILE}")
+    endif ()
+endfunction()
+
+
+if (BUILD_PUBSUB_PSA_WS)
+    add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_ws_and_wire_v1_integration Celix::celix_pubsub_admin_websocket_v2 Celix::celix_pubsub_protocol_wire_v1)
+    add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_ws_and_wire_v2_integration Celix::celix_pubsub_admin_websocket_v2 Celix::celix_pubsub_protocol_wire_v2)
+endif ()
+
+if (BUILD_PUBSUB_PSA_TCP)
+    message(STATUS "TODO enable tcp and interceptors. Currently has a memleak")
+    #add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_tcp_and_wire_v1_integration Celix::celix_pubsub_admin_tcp_v2 Celix::celix_pubsub_protocol_wire_v1)
+    #add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_tcp_and_wire_v2_integration Celix::celix_pubsub_admin_tcp_v2 Celix::celix_pubsub_protocol_wire_v2)
+endif ()
+
+if (BUILD_PUBSUB_PSA_ZMQ)
+    add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_zmq_and_wire_v1_integration Celix::celix_pubsub_admin_zmq_v2 Celix::celix_pubsub_protocol_wire_v1)
+    add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_zmq_and_wire_v2_integration Celix::celix_pubsub_admin_zmq_v2 Celix::celix_pubsub_protocol_wire_v2)
 endif ()
\ No newline at end of file
diff --git a/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc b/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc
index 8f500f5..21cbdc3 100644
--- a/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc
+++ b/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc
@@ -19,96 +19,217 @@
 
 #include <gtest/gtest.h>
 
+#include <memory>
+#include <mutex>
+#include <condition_variable>
+#include <memory>
+
 #include "pubsub_serializer_handler.h"
 #include "celix/FrameworkFactory.h"
 #include "msg.h"
 #include "pubsub_interceptor.h"
 
+struct TestData {
+    TestData(const std::shared_ptr<celix::BundleContext>& ctx) {
+        serHandler = std::shared_ptr<pubsub_serializer_handler>{pubsub_serializerHandler_create(ctx->getCBundleContext(), "json", true), [](pubsub_serializer_handler_t* h) {
+            pubsub_serializerHandler_destroy(h);
+        }};
+    }
+
+    std::shared_ptr<pubsub_serializer_handler_t> serHandler{};
+
+    std::mutex mutex{}; //protects below
+    int preSendCount{0};
+    int postSendCount{0};
+    int preReceiveCount{0};
+    int postReceiveCount{0};
+    std::condition_variable cond{};
+};
+
+static void serializeAndPrint(TestData* testData, uint32_t msgId, const void *msg) {
+    struct iovec* vec = nullptr;
+    size_t vecLen = 0;
+    pubsub_serializerHandler_serialize(testData->serHandler.get(), msgId, msg, &vec, &vecLen);
+    if (vecLen > 0) {
+        for (size_t i = 0; i < vecLen; ++i) {
+            fwrite(vec[i].iov_base, sizeof(char), vec[i].iov_len, stdout);
+        }
+    }
+    fputc('\n', stdout);
+    pubsub_serializerHandler_freeSerializedMsg(testData->serHandler.get(), msgId, vec, vecLen);
+}
+
 class PubSubInterceptorTestSuite : public ::testing::Test {
 public:
     PubSubInterceptorTestSuite() {
         fw = celix::createFramework({
-            {"CELIX_PUBSUB_TEST_ADD_METADATA", "true"} /*TODO memleak in pubsub zmq v2 when metadata is empty*/
+            {"CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL", "info"},
+            {"CELIX_PUBSUB_TEST_ADD_METADATA", "true"}
         });
         ctx = fw->getFrameworkBundleContext();
+        testData = std::make_shared<TestData>(ctx);
 
         EXPECT_GE(ctx->installBundle(PUBSUB_JSON_BUNDLE_FILE), 0);
         EXPECT_GE(ctx->installBundle(PUBSUB_TOPMAN_BUNDLE_FILE), 0);
-        EXPECT_GE(ctx->installBundle(PUBSUB_ZMQ_BUNDLE_FILE), 0);
+        EXPECT_GE(ctx->installBundle(PUBSUB_PSA_BUNDLE_FILE), 0);
         EXPECT_GE(ctx->installBundle(PUBSUB_WIRE_BUNDLE_FILE), 0);
+#ifdef HTTP_ADMIN_BUNDLE_FILE
+        EXPECT_GE(ctx->installBundle(HTTP_ADMIN_BUNDLE_FILE), 0);
+#endif
     }
 
-    std::shared_ptr<celix::Framework> fw{};
-    std::shared_ptr<celix::BundleContext> ctx{};
-};
+    std::shared_ptr<celix::ServiceRegistration> createInterceptor(bool cancelSend, bool cancelReceive) {
+        auto interceptor = std::make_shared<pubsub_interceptor>(pubsub_interceptor{});
+        interceptor->handle = (void*)testData.get();
+        interceptor->preSend  = [](void* handle, const pubsub_interceptor_properties_t *, const char *, const uint32_t,
+                                   const void *, celix_properties_t* metadata) {
+            auto* td = (TestData*)handle;
+            celix_properties_set(metadata, "test", "preSend");
 
-static void serializeAndPrint(pubsub_serializer_handler_t* ser, uint32_t msgId, const void *msg) {
-    struct iovec* vec = nullptr;
-    size_t vecLen = 0;
-    pubsub_serializerHandler_serialize(ser, msgId, msg, &vec, &vecLen);
-    if (vecLen > 0) {
-        for (size_t i = 0; i < vecLen; ++i) {
-            fwrite(vec[i].iov_base, sizeof(char), vec[i].iov_len, stdout);
+            std::lock_guard<std::mutex> lck{td->mutex};
+            td->preSendCount += 1;
+            td->cond.notify_all();
+            return true;
+        };
+        if (cancelSend) {
+            interceptor->preSend = [](void* handle, const pubsub_interceptor_properties_t *, const char *, const uint32_t,
+                                      const void *, celix_properties_t*) {
+                auto* td = (TestData*)handle;
+                std::lock_guard<std::mutex> lck{td->mutex};
+                td->preSendCount += 1;
+                td->cond.notify_all();
+                return false;
+            };
+        }
+        interceptor->postSend = [](void *handle, const pubsub_interceptor_properties_t* intProps, const char *msgType, uint32_t msgId, const void *rawMsg,
+                                   const celix_properties_t* metadata) {
+            auto* td = (TestData*)handle;
+            serializeAndPrint(td, msgId, rawMsg);
+            EXPECT_STREQ(msgType, "msg");
+            const auto *msg = static_cast<const msg_t*>(rawMsg);
+            EXPECT_GE(msg->seqNr, 0);
+            EXPECT_STREQ(celix_properties_get(metadata, "test", nullptr), "preSend");
+            const char *key;
+            CELIX_PROPERTIES_FOR_EACH(metadata, key) {
+                printf("got property %s=%s\n", key, celix_properties_get(metadata, key, nullptr));
+            }
+            fprintf(stdout, "Got message in postSend interceptor %s/%s for type %s and ser %s with seq nr %i\n", intProps->scope, intProps->topic, intProps->psaType, intProps->serializationType, msg->seqNr);
+
+            std::lock_guard<std::mutex> lck{td->mutex};
+            td->postSendCount += 1;
+            td->cond.notify_all();
+        };
+        interceptor->preReceive = [](void* handle, const pubsub_interceptor_properties_t *, const char *, const uint32_t,
+                                     const void *, celix_properties_t* metadata) {
+            auto* td = (TestData*)handle;
+            celix_properties_set(metadata, "test", "preReceive");
+
+            std::lock_guard<std::mutex> lck{td->mutex};
+            td->preReceiveCount += 1;
+            td->cond.notify_all();
+            return true;
+        };
+        if (cancelReceive) {
+            interceptor->preReceive = [](void* handle, const pubsub_interceptor_properties_t *, const char *, const uint32_t,
+                                         const void *, celix_properties_t*) {
+                auto* td = (TestData*)handle;
+                std::lock_guard<std::mutex> lck{td->mutex};
+                td->preReceiveCount += 1;
+                td->cond.notify_all();
+                return false;
+            };
         }
+        interceptor->postReceive = [](void *handle, const pubsub_interceptor_properties_t* intProps, const char *msgType, uint32_t msgId, const void *rawMsg,
+                                      const celix_properties_t* metadata) {
+            auto* td = (TestData*)handle;
+            serializeAndPrint(td, msgId, rawMsg);
+            EXPECT_STREQ(msgType, "msg");
+            const auto *msg = static_cast<const msg_t*>(rawMsg);
+            EXPECT_GE(msg->seqNr, 0);
+            EXPECT_STREQ(celix_properties_get(metadata, "test", nullptr), "preReceive");
+            fprintf(stdout, "Got message in postReceive interceptor %s/%s for type %s and ser %s with seq nr %i\n", intProps->scope, intProps->topic, intProps->psaType, intProps-> serializationType, msg->seqNr);
+
+            std::lock_guard<std::mutex> lck{td->mutex};
+            td->postReceiveCount += 1;
+            td->cond.notify_all();
+        };
+        //note registering identical services to validate multiple interceptors
+        return ctx->registerService<pubsub_interceptor>(std::move(interceptor), PUBSUB_INTERCEPTOR_SERVICE_NAME)
+                .setUnregisterAsync(false) //note to ensure test data is still valid when service is registered
+                .build();
     }
-    fputc('\n', stdout);
-    pubsub_serializerHandler_freeSerializedMsg(ser, msgId, vec, vecLen);
-}
 
-std::shared_ptr<celix::ServiceRegistration> createInterceptor(std::shared_ptr<celix::BundleContext>& ctx) {
-    auto interceptor = std::shared_ptr<pubsub_interceptor>{new pubsub_interceptor{}, [](pubsub_interceptor* inter) {
-        auto* handler = (pubsub_serializer_handler_t*)inter->handle;
-        pubsub_serializerHandler_destroy(handler);
-        delete inter;
-    }};
-    interceptor->handle = pubsub_serializerHandler_create(ctx->getCBundleContext(), "json", true);
-    interceptor->preSend  = [](void *, const pubsub_interceptor_properties_t *, const char *, const uint32_t,
-                               const void *, celix_properties_t* metadata) {
-        celix_properties_set(metadata, "test", "preSend");
-        return true;
-    };
-    interceptor->postSend = [](void *handle, const pubsub_interceptor_properties_t* intProps, const char *msgType, uint32_t msgId, const void *rawMsg,
-                               const celix_properties_t* metadata) {
-        auto* ser = (pubsub_serializer_handler_t*)handle;
-        serializeAndPrint(ser, msgId, rawMsg);
-        EXPECT_STREQ(msgType, "msg");
-        const auto *msg = static_cast<const msg_t*>(rawMsg);
-        EXPECT_GE(msg->seqNr, 0);
-        EXPECT_STREQ(celix_properties_get(metadata, "test", nullptr), "preSend");
-        fprintf(stdout, "Got message in postSend interceptor %s/%s for type %s and ser %s with seq nr %i\n", intProps->scope, intProps->topic, intProps->psaType, intProps->serializationType, msg->seqNr);
-    };
-    interceptor->preReceive = [](void *, const pubsub_interceptor_properties_t *, const char *, const uint32_t,
-                                 const void *, celix_properties_t* metadata) {
-        celix_properties_set(metadata, "test", "preReceive");
-        return true;
-    };
-    interceptor->postReceive = [](void *handle, const pubsub_interceptor_properties_t* intProps, const char *msgType, uint32_t msgId, const void *rawMsg,
-                                  const celix_properties_t* metadata) {
-        auto* ser = (pubsub_serializer_handler_t*)handle;
-        serializeAndPrint(ser, msgId, rawMsg);
-        EXPECT_STREQ(msgType, "msg");
-        const auto *msg = static_cast<const msg_t*>(rawMsg);
-        EXPECT_GE(msg->seqNr, 0);
-        EXPECT_STREQ(celix_properties_get(metadata, "test", nullptr), "preReceive");
-        fprintf(stdout, "Got message in postReceive interceptor %s/%s for type %s and ser %s with seq nr %i\n", intProps->scope, intProps->topic, intProps->psaType, intProps-> serializationType, msg->seqNr);
-    };
-    //note registering identical services to validate multiple interceptors
-    return ctx->registerService<pubsub_interceptor>(interceptor, PUBSUB_INTERCEPTOR_SERVICE_NAME).build();
-}
+    std::shared_ptr<celix::Framework> fw{};
+    std::shared_ptr<celix::BundleContext> ctx{};
+    std::shared_ptr<TestData> testData{};
+};
 
 TEST_F(PubSubInterceptorTestSuite, InterceptorWithSinglePublishersAndMultipleReceivers) {
     //Given a publisher (PUBSUB_PUBLISHER_BUNDLE_FILE) and 2 receivers (PUBSUB_SUBSCRIBER_BUNDLE_FILE)
-    //And a registered interceptor
+    //And several registered interceptors
     //Then the interceptor receives a correct msg type.
 
+    auto reg1 = createInterceptor(false, false);
+    auto reg2 = createInterceptor(false, false);
+    auto reg3 = createInterceptor(false, false);
+    ctx->waitForEvents();
+
     EXPECT_GE(ctx->installBundle(PUBSUB_PUBLISHER_BUNDLE_FILE), 0);
     EXPECT_GE(ctx->installBundle(PUBSUB_SUBSCRIBER_BUNDLE_FILE), 0);
 
-    auto reg1 = createInterceptor(ctx);
-    auto reg2 = createInterceptor(ctx);
-    auto reg3 = createInterceptor(ctx);
+    std::unique_lock<std::mutex> lck{testData->mutex};
+    auto isTestDone = testData->cond.wait_for(lck, std::chrono::seconds{5}, [this]{
+        return  testData->preSendCount > 10 &&
+                testData->postSendCount > 10 &&
+                testData->preReceiveCount > 10 &&
+                testData->postReceiveCount > 10;
+    });
 
-    //TODO stop after a certain amount of messages send
-    //TODO also test with tcp v2.
-    sleep(5);
+    EXPECT_TRUE(isTestDone);
 }
+
+TEST_F(PubSubInterceptorTestSuite, InterceptorWithPreSendCancelWillPreventSends) {
+    //Given a publisher (PUBSUB_PUBLISHER_BUNDLE_FILE) and 2 receivers (PUBSUB_SUBSCRIBER_BUNDLE_FILE)
+    //And a interceptor which cancel a send
+    //Then only the preSend count will be increased, but the rest of the count will be 0
+
+    auto reg1 = createInterceptor(true, false);
+    ctx->waitForEvents();
+
+    EXPECT_GE(ctx->installBundle(PUBSUB_PUBLISHER_BUNDLE_FILE), 0);
+    EXPECT_GE(ctx->installBundle(PUBSUB_SUBSCRIBER_BUNDLE_FILE), 0);
+
+    std::unique_lock<std::mutex> lck{testData->mutex};
+    auto isTestDone = testData->cond.wait_for(lck, std::chrono::seconds{5}, [this]{
+        return  testData->preSendCount > 10 ;
+    });
+
+    EXPECT_EQ(testData->postSendCount, 0);
+    EXPECT_EQ(testData->preReceiveCount, 0);
+    EXPECT_EQ(testData->postReceiveCount, 0);
+
+    EXPECT_TRUE(isTestDone);
+}
+
+TEST_F(PubSubInterceptorTestSuite, InterceptorWithPreRedeiveCancelWillPreventPostReceive) {
+    //Given a publisher (PUBSUB_PUBLISHER_BUNDLE_FILE) and 2 receivers (PUBSUB_SUBSCRIBER_BUNDLE_FILE)
+    //And a interceptor which cancel a receive
+    //Then the preSend, postSend and preReceive count will be increased, but the postReceive count will be 0
+
+    auto reg1 = createInterceptor(false, true);
+    ctx->waitForEvents();
+
+    EXPECT_GE(ctx->installBundle(PUBSUB_PUBLISHER_BUNDLE_FILE), 0);
+    EXPECT_GE(ctx->installBundle(PUBSUB_SUBSCRIBER_BUNDLE_FILE), 0);
+
+    std::unique_lock<std::mutex> lck{testData->mutex};
+    auto isTestDone = testData->cond.wait_for(lck, std::chrono::seconds{5}, [this]{
+        return  testData->preSendCount > 10 &&
+                testData->postSendCount > 10 &&
+                testData->preReceiveCount > 10;
+    });
+
+    EXPECT_EQ(testData->postReceiveCount, 0);
+
+    EXPECT_TRUE(isTestDone);
+}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
index e9ef6b4..fe6fd53 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
@@ -40,6 +40,8 @@
 #define UUID_STR_LEN  37
 #endif
 
+#define L_TRACE(...) \
+    celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_TRACE, __VA_ARGS__)
 #define L_DEBUG(...) \
     celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_DEBUG, __VA_ARGS__)
 #define L_INFO(...) \
@@ -486,13 +488,15 @@ static inline void processMsg(void* handle, const pubsub_protocol_message_t *mes
         if (status == CELIX_SUCCESS) {
             celix_properties_t *metadata = message->metadata.metadata;
             bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, message->header.msgId, deSerializedMsg, &metadata);
+            bool release = true;
             if (cont) {
-                bool release;
                 callReceivers(receiver, msgFqn, message, &deSerializedMsg, &release, metadata);
                 pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, message->header.msgId, deSerializedMsg, metadata);
-                if (release) {
-                    pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deSerializedMsg);
-                }
+            } else {
+                L_TRACE("Skipping receive for msg type %s, based on pre receive interceptor result", msgFqn);
+            }
+            if (release) {
+                pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deSerializedMsg);
             }
         } else {
             L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
index 3c58e84..a817a3e 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
@@ -374,6 +374,7 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
     bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, &metadata);
     if (!cont) {
         L_DEBUG("Cancel send based on pubsub interceptor cancel return");
+        celix_properties_destroy(metadata);
         return status;
     }
 
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
index 56f4008..7421b09 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
@@ -491,14 +491,17 @@ static void processJsonMsg(pubsub_websocket_topic_receiver_t *receiver, const pu
             celix_properties_t *metadata = NULL; //NOTE metadata not supported for websocket
             bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, header->fqn, msgId,
                                                                   deserializedMsg, &metadata);
+            bool release = true;
             if (cont) {
-                bool release;
                 callReceivers(receiver, msgId, header, payload, payloadSize, &deserializedMsg, &release, metadata);
                 pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, header->fqn, msgId, deserializedMsg, metadata);
-                if (release) {
-                    pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, msgId, deserializedMsg);
-                }
+            } else {
+                L_TRACE("Skipping receive for msg type %s, based on pre receive interceptor result", header->fqn);
+            }
+            if (release) {
+                pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, msgId, deserializedMsg);
             }
+            celix_properties_destroy(metadata);
         } else {
             L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", header->fqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
         }
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
index e435093..7ecf546 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
@@ -270,6 +270,7 @@ static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgType
     bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, &metadata);
     if (!cont) {
         L_DEBUG("Cancel send based on pubsub interceptor cancel return");
+        celix_properties_destroy(metadata);
         return status;
     }
 
@@ -316,6 +317,7 @@ static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgType
     }
 
     pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, metadata);
+    celix_properties_destroy(metadata);
 
     return status;
 }
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
index f5e70b0..90d93cb 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
@@ -51,7 +51,8 @@
 #define UUID_STR_LEN 37
 #endif
 
-
+#define L_TRACE(...) \
+    celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_TRACE, __VA_ARGS__)
 #define L_DEBUG(...) \
     celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_DEBUG, __VA_ARGS__)
 #define L_INFO(...) \
@@ -468,13 +469,15 @@ static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver, pubsub_prot
         if (status == CELIX_SUCCESS) {
             celix_properties_t *metadata = message->metadata.metadata;
             bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, message->header.msgId, deserializedMsg, &metadata);
+            bool release = true;
             if (cont) {
-                bool release;
                 callReceivers(receiver, msgFqn, message, &deserializedMsg, &release, metadata);
                 pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, message->header.msgId, deserializedMsg, metadata);
-                if (release) {
-                    pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deserializedMsg);
-                }
+            } else {
+                L_TRACE("Skipping receive for msg type %s, based on pre receive interceptor result", msgFqn);
+            }
+            if (release) {
+                pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deserializedMsg);
             }
         } else {
             L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
index 155cb19..64c3b41 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
@@ -410,6 +410,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
     bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, &metadata);
     if (!cont) {
         L_DEBUG("Cancel send based on pubsub interceptor cancel return");
+        celix_properties_destroy(metadata);
         return status;
     }
 
@@ -541,10 +542,8 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
     }
     __atomic_store_n(&sender->zmqBuffers.dataLock, false, __ATOMIC_RELEASE);
     pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, metadata);
+    celix_properties_destroy(metadata);
 
-    if (message.metadata.metadata) {
-        celix_properties_destroy(message.metadata.metadata);
-    }
     if (!bound->parent->zeroCopyEnabled && serializedIoVecOutput) {
         pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedIoVecOutput, serializedIoVecOutputLen);
     }