You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2021/06/28 19:42:51 UTC

[celix] branch feature/pubsub-interceptor-fix created (now 6229f9e)

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a change to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git.


      at 6229f9e  Updates the interceptor api so that metadata can be extended in the preSend/Receive callbacks.

This branch includes the following new commits:

     new 2f0d2f8  renamed pubsub/test to pubsub/integration
     new 0da5adc  Refactors pubsub integration sources setup
     new fc71742  Removes unused pubsub mock stuff
     new 3997906  Adds missing ifdef c++ checks and update interceptor handle to not create a empty metadata if not present on the wire.
     new 10117c9  Adds initial PubSubInterceptorTestSuite
     new 461a2ce  Refactors v2 of pubsub tcp and zmq to only call interceptors callback once per receive. renames old psa to _v1.
     new 6229f9e  Updates the interceptor api so that metadata can be extended in the preSend/Receive callbacks.

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[celix] 05/07: Adds initial PubSubInterceptorTestSuite

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 10117c98dd32861eca75dfc6f261d7f710f0957a
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Sun Jun 27 16:40:02 2021 +0200

    Adds initial PubSubInterceptorTestSuite
---
 .../gtest/PubSubInterceptorTestSuite.cc            | 100 +++++++++++++++++++++
 1 file changed, 100 insertions(+)

diff --git a/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc b/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc
new file mode 100644
index 0000000..181eff3
--- /dev/null
+++ b/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "pubsub_serializer_handler.h"
+#include "celix/FrameworkFactory.h"
+#include "msg.h"
+#include "pubsub_interceptor.h"
+
+class PubSubInterceptorTestSuite : public ::testing::Test {
+public:
+    PubSubInterceptorTestSuite() {
+        fw = celix::createFramework({
+            {"CELIX_PUBSUB_TEST_ADD_METADATA", "true"} /*TODO memleak in pubsub zmq v2 when metadata is empty*/
+        });
+        ctx = fw->getFrameworkBundleContext();
+
+        EXPECT_GE(ctx->installBundle(PUBSUB_JSON_BUNDLE_FILE), 0);
+        EXPECT_GE(ctx->installBundle(PUBSUB_TOPMAN_BUNDLE_FILE), 0);
+        EXPECT_GE(ctx->installBundle(PUBSUB_ZMQ_BUNDLE_FILE), 0);
+        EXPECT_GE(ctx->installBundle(PUBSUB_WIRE_BUNDLE_FILE), 0);
+    }
+
+    std::shared_ptr<celix::Framework> fw{};
+    std::shared_ptr<celix::BundleContext> ctx{};
+};
+
+static void serializeAndPrint(pubsub_serializer_handler_t* ser, uint32_t msgId, const void *msg) {
+    struct iovec* vec = nullptr;
+    size_t vecLen = 0;
+    pubsub_serializerHandler_serialize(ser, msgId, msg, &vec, &vecLen);
+    if (vecLen > 0) {
+        for (size_t i = 0; i < vecLen; ++i) {
+            fwrite(vec[i].iov_base, sizeof(char), vec[i].iov_len, stdout);
+        }
+    }
+    fputc('\n', stdout);
+    pubsub_serializerHandler_freeSerializedMsg(ser, msgId, vec, vecLen);
+}
+
+std::shared_ptr<celix::ServiceRegistration> createInterceptor(std::shared_ptr<celix::BundleContext>& ctx) {
+    auto interceptor = std::shared_ptr<pubsub_interceptor>{new pubsub_interceptor{}, [](pubsub_interceptor* inter) {
+        auto* handler = (pubsub_serializer_handler_t*)inter->handle;
+        pubsub_serializerHandler_destroy(handler);
+        delete inter;
+    }};
+    interceptor->handle = pubsub_serializerHandler_create(ctx->getCBundleContext(), "json", true);
+    interceptor->postSend = [](void *handle, pubsub_interceptor_properties_t *, const char *msgType, uint32_t msgId, const void *rawMsg,
+                               const celix_properties_t *) {
+        auto* ser = (pubsub_serializer_handler_t*)handle;
+        serializeAndPrint(ser, msgId, rawMsg);
+        EXPECT_STREQ(msgType, "msg");
+        const auto *msg = static_cast<const msg_t*>(rawMsg);
+        EXPECT_GE(msg->seqNr, 0);
+        fprintf(stdout, "Got message in postSend interceptor %p with seq nr %i\n", handle, msg->seqNr);
+    };
+    interceptor->postReceive = [](void *handle, pubsub_interceptor_properties_t *, const char *msgType, uint32_t msgId, const void *rawMsg,
+                                  const celix_properties_t *) {
+        auto* ser = (pubsub_serializer_handler_t*)handle;
+        serializeAndPrint(ser, msgId, rawMsg);
+        EXPECT_STREQ(msgType, "msg");
+        const auto *msg = static_cast<const msg_t*>(rawMsg);
+        EXPECT_GE(msg->seqNr, 0);
+        fprintf(stdout, "Got message in postReceive interceptor %p with seq nr %i\n", handle, msg->seqNr);
+    };
+    //note registering identical services to validate multiple interceptors
+    return ctx->registerService<pubsub_interceptor>(interceptor, PUBSUB_INTERCEPTOR_SERVICE_NAME).build();
+}
+
+TEST_F(PubSubInterceptorTestSuite, InterceptorWithSinglePublishersAndMultipleReceivers) {
+    //Given a publisher (PUBSUB_PUBLISHER_BUNDLE_FILE) and 2 receivers (PUBSUB_SUBSCRIBER_BUNDLE_FILE)
+    //And a registered interceptor
+    //Then the interceptor receives a correct msg type.
+
+    EXPECT_GE(ctx->installBundle(PUBSUB_PUBLISHER_BUNDLE_FILE), 0);
+    EXPECT_GE(ctx->installBundle(PUBSUB_SUBSCRIBER_BUNDLE_FILE), 0);
+
+    auto reg1 = createInterceptor(ctx);
+    auto reg2 = createInterceptor(ctx);
+    auto reg3 = createInterceptor(ctx);
+
+    sleep(5);
+}

[celix] 02/07: Refactors pubsub integration sources setup

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 0da5adc77eb40974321c916e0758dd46f0c6a5a4
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Fri Jun 25 19:07:12 2021 +0200

    Refactors pubsub integration sources setup
---
 bundles/pubsub/integration/CMakeLists.txt          | 120 ++++++++++-----------
 .../PubSubEndpointIntegrationTestSuite.cc}         |   0
 .../PubSubIntegrationTestSuite.cc}                 |   0
 .../PubSubTopicAndScopeIntegrationTestSuite.cc     |   0
 .../integration/{test => src}/loopback_activator.c |   0
 bundles/pubsub/integration/{test => src}/msg.h     |   0
 .../{test => src}/receive_count_service.h          |   0
 .../{test => src}/serializer_activator.cc          |   0
 .../integration/{test => src}/sut_activator.c      |   0
 .../{test => src}/sut_endpoint_activator.c         |   0
 .../integration/{test => src}/tst_activator.c      |   0
 .../{test => src}/tst_endpoint_activator.c         |   0
 12 files changed, 60 insertions(+), 60 deletions(-)

diff --git a/bundles/pubsub/integration/CMakeLists.txt b/bundles/pubsub/integration/CMakeLists.txt
index 260c7e7..3cda520 100644
--- a/bundles/pubsub/integration/CMakeLists.txt
+++ b/bundles/pubsub/integration/CMakeLists.txt
@@ -20,10 +20,10 @@ find_package(Jansson REQUIRED)
 add_celix_bundle(pubsub_endpoint_sut
         #"Vanilla" bundle which is under test
         SOURCES
-        test/sut_endpoint_activator.c
+        src/sut_endpoint_activator.c
         VERSION 1.0.0
         )
-target_include_directories(pubsub_endpoint_sut PRIVATE test)
+target_include_directories(pubsub_endpoint_sut PRIVATE src)
 target_link_libraries(pubsub_endpoint_sut PRIVATE Celix::pubsub_api)
 celix_bundle_files(pubsub_endpoint_sut
         meta_data/msg.descriptor
@@ -37,7 +37,7 @@ celix_bundle_files(pubsub_endpoint_sut
 add_celix_bundle(pubsub_endpoint_tst
         #Test bundle containing cpputests and uses celix_test_runner launcher instead of the celix launcher
         SOURCES
-        test/tst_endpoint_activator.c
+        src/tst_endpoint_activator.c
         VERSION 1.0.0
         )
 target_link_libraries(pubsub_endpoint_tst PRIVATE Celix::framework Celix::pubsub_api)
@@ -54,10 +54,10 @@ celix_bundle_files(pubsub_endpoint_tst
 add_celix_bundle(pubsub_loopback
         #"Vanilla" bundle which is under test
         SOURCES
-        test/loopback_activator.c
+        src/loopback_activator.c
         VERSION 1.0.0
         )
-target_include_directories(pubsub_loopback PRIVATE test)
+target_include_directories(pubsub_loopback PRIVATE src)
 target_link_libraries(pubsub_loopback PRIVATE Celix::pubsub_api)
 celix_bundle_files(pubsub_loopback
         meta_data/msg.descriptor
@@ -75,10 +75,10 @@ celix_bundle_files(pubsub_loopback
 add_celix_bundle(pubsub_sut
     #"Vanilla" bundle which is under test
     SOURCES
-        test/sut_activator.c
+        src/sut_activator.c
     VERSION 1.0.0
 )
-target_include_directories(pubsub_sut PRIVATE test)
+target_include_directories(pubsub_sut PRIVATE src)
 target_link_libraries(pubsub_sut PRIVATE Celix::pubsub_api)
 celix_bundle_files(pubsub_sut
     meta_data/msg.descriptor
@@ -92,7 +92,7 @@ celix_bundle_files(pubsub_sut
 add_celix_bundle(pubsub_tst
     #Test bundle containing cpputests and uses celix_test_runner launcher instead of the celix launcher
     SOURCES
-        test/tst_activator.c
+        src/tst_activator.c
     VERSION 1.0.0
 )
 target_link_libraries(pubsub_tst PRIVATE Celix::framework Celix::pubsub_api)
@@ -108,7 +108,7 @@ celix_bundle_files(pubsub_tst
 add_celix_bundle(pubsub_deadlock_sut
     #"Vanilla" bundle which is used to trigger a publisher added call
     SOURCES
-    test/sut_activator.c
+    src/sut_activator.c
     VERSION 1.0.0
 )
 celix_bundle_files(pubsub_deadlock_sut
@@ -130,16 +130,16 @@ celix_get_bundle_file(pubsub_deadlock_sut DEADLOCK_SUT_BUNDLE_FILE)
 add_celix_bundle(pubsub_serializer
         #serializer bundle
         SOURCES
-        test/serializer_activator.cc
+        src/serializer_activator.cc
         VERSION 1.0.0
         )
-target_include_directories(pubsub_serializer PRIVATE test)
+target_include_directories(pubsub_serializer PRIVATE src)
 target_link_libraries(pubsub_serializer PRIVATE Celix::pubsub_api Celix::pubsub_spi)
 
 if (BUILD_PUBSUB_PSA_UDP_MC)
     add_celix_container(pubsub_udpmc_tests
             USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
-            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/gtest/PubSubIntegrationTestSuite.cc
             DIR ${CMAKE_CURRENT_BINARY_DIR}
             PROPERTIES
                 LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
@@ -152,7 +152,7 @@ if (BUILD_PUBSUB_PSA_UDP_MC)
                 pubsub_tst
     )
     target_link_libraries(pubsub_udpmc_tests PRIVATE Celix::pubsub_api Jansson Celix::dfi GTest::gtest GTest::gtest_main)
-    target_include_directories(pubsub_udpmc_tests SYSTEM PRIVATE test)
+    target_include_directories(pubsub_udpmc_tests SYSTEM PRIVATE src)
 
     add_celix_container(pstm_deadlock_udpmc_test
             USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
@@ -177,8 +177,8 @@ if (BUILD_PUBSUB_PSA_UDP_MC)
     add_dependencies(pstm_deadlock_udpmc_test pubsub_deadlock_sut_bundle)
 
     #Framework "bundle" has no cache dir. Default as "cache dir" the cwd is used.
-    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/msg.descriptor ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_udpmc_test/META-INF/descriptors/msg.descriptor COPYONLY)
-    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/deadlock.scope.properties ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_udpmc_test/META-INF/topics/pub/deadlock.properties COPYONLY)
+    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/msg.descriptor ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_udpmc_src/META-INF/descriptors/msg.descriptor COPYONLY)
+    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/deadlock.scope.properties ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_udpmc_src/META-INF/topics/pub/deadlock.properties COPYONLY)
 
     add_test(NAME pstm_deadlock_udpmc_test COMMAND pstm_deadlock_udpmc_test WORKING_DIRECTORY $<TARGET_PROPERTY:pstm_deadlock_udpmc_test,CONTAINER_LOC>)
     setup_target_for_coverage(pstm_deadlock_udpmc_test SCAN_DIR ..)
@@ -191,7 +191,7 @@ endif()
 if (BUILD_PUBSUB_PSA_TCP)
     add_celix_container(pubsub_tcp_wire_v1_tests
             USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
-            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/gtest/PubSubIntegrationTestSuite.cc
             DIR ${CMAKE_CURRENT_BINARY_DIR}
             PROPERTIES
             LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
@@ -207,13 +207,13 @@ if (BUILD_PUBSUB_PSA_TCP)
             pubsub_tst
             )
     target_link_libraries(pubsub_tcp_wire_v1_tests PRIVATE Celix::pubsub_api Jansson Celix::dfi GTest::gtest GTest::gtest_main)
-    target_include_directories(pubsub_tcp_wire_v1_tests SYSTEM PRIVATE test)
+    target_include_directories(pubsub_tcp_wire_v1_tests SYSTEM PRIVATE src)
     add_test(NAME pubsub_tcp_wire_v1_tests COMMAND pubsub_tcp_wire_v1_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_tcp_wire_v1_tests,CONTAINER_LOC>)
     setup_target_for_coverage(pubsub_tcp_wire_v1_tests SCAN_DIR ..)
 
     add_celix_container(pubsub_tcp_wire_v2_tests
             USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
-            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/gtest/PubSubIntegrationTestSuite.cc
             DIR ${CMAKE_CURRENT_BINARY_DIR}
             PROPERTIES
             LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
@@ -229,13 +229,13 @@ if (BUILD_PUBSUB_PSA_TCP)
             pubsub_tst
             )
     target_link_libraries(pubsub_tcp_wire_v2_tests PRIVATE Celix::pubsub_api Jansson Celix::dfi GTest::gtest GTest::gtest_main)
-    target_include_directories(pubsub_tcp_wire_v2_tests SYSTEM PRIVATE test)
+    target_include_directories(pubsub_tcp_wire_v2_tests SYSTEM PRIVATE src)
     add_test(NAME pubsub_tcp_wire_v2_tests COMMAND pubsub_tcp_wire_v2_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_tcp_wire_v2_tests,CONTAINER_LOC>)
     setup_target_for_coverage(pubsub_tcp_wire_v2_tests SCAN_DIR ..)
 
     add_celix_container(pubsub_tcp_wire_v2_with_no_scope_tests
         USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
-        LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+        LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/gtest/PubSubIntegrationTestSuite.cc
         DIR ${CMAKE_CURRENT_BINARY_DIR}
         PROPERTIES
             LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
@@ -252,13 +252,13 @@ if (BUILD_PUBSUB_PSA_TCP)
             pubsub_tst
     )
     target_link_libraries(pubsub_tcp_wire_v2_with_no_scope_tests PRIVATE Celix::pubsub_api Jansson Celix::dfi GTest::gtest GTest::gtest_main)
-    target_include_directories(pubsub_tcp_wire_v2_with_no_scope_tests SYSTEM PRIVATE test)
+    target_include_directories(pubsub_tcp_wire_v2_with_no_scope_tests SYSTEM PRIVATE src)
     add_test(NAME pubsub_tcp_wire_v2_with_no_scope_tests COMMAND pubsub_tcp_wire_v2_with_no_scope_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_tcp_wire_v2_with_no_scope_tests,CONTAINER_LOC>)
     setup_target_for_coverage(pubsub_tcp_wire_v2_with_no_scope_tests SCAN_DIR ..)
 
     add_celix_container(pubsub_tcp_endpoint_tests
             USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
-            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_endpoint_runner.cc
+            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/gtest/PubSubEndpointIntegrationTestSuite.cc
             DIR ${CMAKE_CURRENT_BINARY_DIR}
             PROPERTIES
             LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
@@ -275,7 +275,7 @@ if (BUILD_PUBSUB_PSA_TCP)
             pubsub_loopback
             )
     target_link_libraries(pubsub_tcp_endpoint_tests PRIVATE Celix::pubsub_api Jansson Celix::dfi GTest::gtest GTest::gtest_main)
-    target_include_directories(pubsub_tcp_endpoint_tests SYSTEM PRIVATE test)
+    target_include_directories(pubsub_tcp_endpoint_tests SYSTEM PRIVATE src)
 
     add_celix_container(pstm_deadlock_tcp_test
             USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
@@ -300,8 +300,8 @@ if (BUILD_PUBSUB_PSA_TCP)
     add_dependencies(pstm_deadlock_tcp_test pubsub_deadlock_sut_bundle)
 
     #Framework "bundle" has no cache dir. Default as "cache dir" the cwd is used.
-    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/msg.descriptor ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_tcp_test/META-INF/descriptors/msg.descriptor COPYONLY)
-    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/deadlock.scope.properties ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_tcp_test/META-INF/topics/pub/deadlock.properties COPYONLY)
+    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/msg.descriptor ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_tcp_src/META-INF/descriptors/msg.descriptor COPYONLY)
+    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/deadlock.scope.properties ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_tcp_src/META-INF/topics/pub/deadlock.properties COPYONLY)
 
     add_test(NAME pstm_deadlock_tcp_test COMMAND pstm_deadlock_tcp_test WORKING_DIRECTORY $<TARGET_PROPERTY:pstm_deadlock_tcp_test,CONTAINER_LOC>)
     setup_target_for_coverage(pstm_deadlock_tcp_test SCAN_DIR ..)
@@ -316,7 +316,7 @@ if (BUILD_PUBSUB_PSA_TCP)
 
     add_celix_container(pubsub_tcp_v2_wire_v1_tests
             USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
-            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/gtest/PubSubIntegrationTestSuite.cc
             DIR ${CMAKE_CURRENT_BINARY_DIR}
             PROPERTIES
             LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
@@ -333,13 +333,13 @@ if (BUILD_PUBSUB_PSA_TCP)
             pubsub_serializer
             )
     target_link_libraries(pubsub_tcp_v2_wire_v1_tests PRIVATE Celix::pubsub_api Celix::dfi GTest::gtest GTest::gtest_main)
-    target_include_directories(pubsub_tcp_v2_wire_v1_tests SYSTEM PRIVATE test)
+    target_include_directories(pubsub_tcp_v2_wire_v1_tests SYSTEM PRIVATE src)
     add_test(NAME pubsub_tcp_v2_wire_v1_tests COMMAND pubsub_tcp_v2_wire_v1_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_tcp_v2_wire_v1_tests,CONTAINER_LOC>)
     setup_target_for_coverage(pubsub_tcp_v2_wire_v1_tests SCAN_DIR ..)
 
     add_celix_container(pubsub_tcp_v2_wire_v2_tests
             USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
-            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/gtest/PubSubIntegrationTestSuite.cc
             DIR ${CMAKE_CURRENT_BINARY_DIR}
             PROPERTIES
             LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
@@ -356,13 +356,13 @@ if (BUILD_PUBSUB_PSA_TCP)
             pubsub_serializer
             )
     target_link_libraries(pubsub_tcp_v2_wire_v2_tests PRIVATE Celix::pubsub_api Celix::dfi GTest::gtest GTest::gtest_main)
-    target_include_directories(pubsub_tcp_v2_wire_v2_tests SYSTEM PRIVATE test)
+    target_include_directories(pubsub_tcp_v2_wire_v2_tests SYSTEM PRIVATE src)
     add_test(NAME pubsub_tcp_v2_wire_v2_tests COMMAND pubsub_tcp_v2_wire_v2_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_tcp_v2_wire_v2_tests,CONTAINER_LOC>)
     setup_target_for_coverage(pubsub_tcp_v2_wire_v2_tests SCAN_DIR ..)
 
     add_celix_container(pubsub_tcp_v2_wire_v2_with_no_scope_tests
             USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
-            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/gtest/PubSubIntegrationTestSuite.cc
             DIR ${CMAKE_CURRENT_BINARY_DIR}
             PROPERTIES
             LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
@@ -380,13 +380,13 @@ if (BUILD_PUBSUB_PSA_TCP)
             pubsub_serializer
             )
     target_link_libraries(pubsub_tcp_v2_wire_v2_with_no_scope_tests PRIVATE Celix::pubsub_api Celix::dfi GTest::gtest GTest::gtest_main)
-    target_include_directories(pubsub_tcp_v2_wire_v2_with_no_scope_tests SYSTEM PRIVATE test)
+    target_include_directories(pubsub_tcp_v2_wire_v2_with_no_scope_tests SYSTEM PRIVATE src)
     add_test(NAME pubsub_tcp_v2_wire_v2_with_no_scope_tests COMMAND pubsub_tcp_v2_wire_v2_with_no_scope_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_tcp_v2_wire_v2_with_no_scope_tests,CONTAINER_LOC>)
     setup_target_for_coverage(pubsub_tcp_v2_wire_v2_with_no_scope_tests SCAN_DIR ..)
 
     add_celix_container(pubsub_tcp_v2_endpoint_tests
             USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
-            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_endpoint_runner.cc
+            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/gtest/PubSubEndpointIntegrationTestSuite.cc
             DIR ${CMAKE_CURRENT_BINARY_DIR}
             PROPERTIES
             LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
@@ -404,7 +404,7 @@ if (BUILD_PUBSUB_PSA_TCP)
             pubsub_serializer
             )
     target_link_libraries(pubsub_tcp_v2_endpoint_tests PRIVATE Celix::pubsub_api Celix::dfi GTest::gtest GTest::gtest_main)
-    target_include_directories(pubsub_tcp_v2_endpoint_tests SYSTEM PRIVATE test)
+    target_include_directories(pubsub_tcp_v2_endpoint_tests SYSTEM PRIVATE src)
 
     add_celix_container(pstm_deadlock_tcp_v2_test
             USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
@@ -430,8 +430,8 @@ if (BUILD_PUBSUB_PSA_TCP)
     add_dependencies(pstm_deadlock_tcp_v2_test pubsub_deadlock_sut_bundle)
 
     #Framework "bundle" has no cache dir. Default as "cache dir" the cwd is used.
-    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/msg.descriptor ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_tcp_test/META-INF/descriptors/msg.descriptor COPYONLY)
-    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/deadlock.scope.properties ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_tcp_test/META-INF/topics/pub/deadlock.properties COPYONLY)
+    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/msg.descriptor ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_tcp_src/META-INF/descriptors/msg.descriptor COPYONLY)
+    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/deadlock.scope.properties ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_tcp_src/META-INF/topics/pub/deadlock.properties COPYONLY)
 
     add_test(NAME pstm_deadlock_tcp_v2_test COMMAND pstm_deadlock_tcp_v2_test WORKING_DIRECTORY $<TARGET_PROPERTY:pstm_deadlock_tcp_v2_test,CONTAINER_LOC>)
     setup_target_for_coverage(pstm_deadlock_tcp_v2_test SCAN_DIR ..)
@@ -446,7 +446,7 @@ endif()
 if (BUILD_PUBSUB_PSA_WS)
     add_celix_container(pubsub_websocket_tests
             USE_CONFIG
-            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/gtest/PubSubIntegrationTestSuite.cc
             DIR ${CMAKE_CURRENT_BINARY_DIR}
             PROPERTIES
             LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
@@ -462,7 +462,7 @@ if (BUILD_PUBSUB_PSA_WS)
             pubsub_tst
             )
     target_link_libraries(pubsub_websocket_tests PRIVATE Celix::pubsub_api Jansson Celix::dfi civetweb_shared GTest::gtest GTest::gtest_main)
-    target_include_directories(pubsub_websocket_tests SYSTEM PRIVATE test)
+    target_include_directories(pubsub_websocket_tests SYSTEM PRIVATE src)
     add_test(NAME pubsub_websocket_tests COMMAND pubsub_websocket_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_websocket_tests,CONTAINER_LOC>)
     setup_target_for_coverage(pubsub_websocket_tests SCAN_DIR ..)
 
@@ -489,15 +489,15 @@ if (BUILD_PUBSUB_PSA_WS)
     add_dependencies(pstm_deadlock_websocket_test pubsub_deadlock_sut_bundle)
 
     #Framework "bundle" has no cache dir. Default as "cache dir" the cwd is used.
-    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/msg.descriptor ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_websocket_test/META-INF/descriptors/msg.descriptor COPYONLY)
-    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/deadlock.scope.properties ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_websocket_test/META-INF/topics/pub/deadlock.properties COPYONLY)
+    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/msg.descriptor ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_websocket_src/META-INF/descriptors/msg.descriptor COPYONLY)
+    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/deadlock.scope.properties ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_websocket_src/META-INF/topics/pub/deadlock.properties COPYONLY)
 
     add_test(NAME pstm_deadlock_websocket_test COMMAND pstm_deadlock_websocket_test WORKING_DIRECTORY $<TARGET_PROPERTY:pstm_deadlock_websocket_test,CONTAINER_LOC>)
     setup_target_for_coverage(pstm_deadlock_websocket_test SCAN_DIR ..)
 
     add_celix_container(pubsub_websocket_v2_tests
             USE_CONFIG
-            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/gtest/PubSubIntegrationTestSuite.cc
             DIR ${CMAKE_CURRENT_BINARY_DIR}
             PROPERTIES
             LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
@@ -514,7 +514,7 @@ if (BUILD_PUBSUB_PSA_WS)
             pubsub_serializer
             )
     target_link_libraries(pubsub_websocket_v2_tests PRIVATE Celix::pubsub_api Jansson civetweb_shared GTest::gtest GTest::gtest_main)
-    target_include_directories(pubsub_websocket_v2_tests SYSTEM PRIVATE test)
+    target_include_directories(pubsub_websocket_v2_tests SYSTEM PRIVATE src)
     add_test(NAME pubsub_websocket_v2_tests COMMAND pubsub_websocket_v2_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_websocket_v2_tests,CONTAINER_LOC>)
     setup_target_for_coverage(pubsub_websocket_v2_tests SCAN_DIR ..)
 
@@ -541,8 +541,8 @@ if (BUILD_PUBSUB_PSA_WS)
     add_dependencies(pstm_deadlock_websocket_v2_test pubsub_deadlock_sut_bundle)
 
     #Framework "bundle" has no cache dir. Default as "cache dir" the cwd is used.
-    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/msg.descriptor ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_websocket_v2_test/META-INF/descriptors/msg.descriptor COPYONLY)
-    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/deadlock.scope.properties ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_websocket_v2_test/META-INF/topics/pub/deadlock.properties COPYONLY)
+    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/msg.descriptor ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_websocket_v2_src/META-INF/descriptors/msg.descriptor COPYONLY)
+    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/deadlock.scope.properties ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_websocket_v2_src/META-INF/topics/pub/deadlock.properties COPYONLY)
 
     add_test(NAME pstm_deadlock_websocket_v2_test COMMAND pstm_deadlock_websocket_v2_test WORKING_DIRECTORY $<TARGET_PROPERTY:pstm_deadlock_websocket_v2_test,CONTAINER_LOC>)
     setup_target_for_coverage(pstm_deadlock_websocket_v2_test SCAN_DIR ..)
@@ -554,7 +554,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 
     add_celix_container(pubsub_zmq_tests
             USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
-            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/gtest/PubSubIntegrationTestSuite.cc
             DIR ${CMAKE_CURRENT_BINARY_DIR}
             PROPERTIES
                 LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
@@ -571,7 +571,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 
     add_celix_container(pubsub_zmq_v2_tests
             USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
-            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/gtest/PubSubIntegrationTestSuite.cc
             DIR ${CMAKE_CURRENT_BINARY_DIR}
             PROPERTIES
                 LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
@@ -587,18 +587,18 @@ if (BUILD_PUBSUB_PSA_ZMQ)
             )
 
     target_link_libraries(pubsub_zmq_tests PRIVATE Celix::pubsub_api Celix::dfi ZMQ::lib CZMQ::lib GTest::gtest GTest::gtest_main)
-    target_include_directories(pubsub_zmq_tests SYSTEM PRIVATE test)
+    target_include_directories(pubsub_zmq_tests SYSTEM PRIVATE src)
     add_test(NAME pubsub_zmq_tests COMMAND pubsub_zmq_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_zmq_tests,CONTAINER_LOC>)
     setup_target_for_coverage(pubsub_zmq_tests SCAN_DIR ..)
 
     target_link_libraries(pubsub_zmq_v2_tests PRIVATE Celix::pubsub_api Celix::dfi ZMQ::lib CZMQ::lib GTest::gtest GTest::gtest_main)
-    target_include_directories(pubsub_zmq_v2_tests SYSTEM PRIVATE test)
+    target_include_directories(pubsub_zmq_v2_tests SYSTEM PRIVATE src)
     add_test(NAME pubsub_zmq_v2_tests COMMAND pubsub_zmq_v2_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_zmq_v2_tests,CONTAINER_LOC>)
     setup_target_for_coverage(pubsub_zmq_v2_tests SCAN_DIR ..)
 
     add_celix_container(pubsub_zmq_wire_v2_tests
         USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
-        LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+        LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/gtest/PubSubIntegrationTestSuite.cc
         DIR ${CMAKE_CURRENT_BINARY_DIR}
         PROPERTIES
         LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
@@ -614,14 +614,14 @@ if (BUILD_PUBSUB_PSA_ZMQ)
         )
 
     target_link_libraries(pubsub_zmq_wire_v2_tests PRIVATE Celix::pubsub_api Jansson Celix::dfi ZMQ::lib CZMQ::lib GTest::gtest GTest::gtest_main)
-    target_include_directories(pubsub_zmq_wire_v2_tests SYSTEM PRIVATE test)
+    target_include_directories(pubsub_zmq_wire_v2_tests SYSTEM PRIVATE src)
     add_test(NAME pubsub_zmq_wire_v2_tests COMMAND pubsub_zmq_wire_v2_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_zmq_wire_v2_tests,CONTAINER_LOC>)
     setup_target_for_coverage(pubsub_zmq_wire_v2_tests SCAN_DIR ..)
 
 
     add_celix_container(pubsub_zmq_zerocopy_tests
             USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
-            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/gtest/PubSubIntegrationTestSuite.cc
             DIR ${CMAKE_CURRENT_BINARY_DIR}
             PROPERTIES
                 LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
@@ -641,7 +641,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 
     add_celix_container(pubsub_zmq_v2_zerocopy_tests
             USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
-            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/gtest/PubSubIntegrationTestSuite.cc
             DIR ${CMAKE_CURRENT_BINARY_DIR}
             PROPERTIES
                 LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
@@ -659,20 +659,20 @@ if (BUILD_PUBSUB_PSA_ZMQ)
                 pubsub_serializer
             )
     target_link_libraries(pubsub_zmq_zerocopy_tests PRIVATE Celix::pubsub_api Jansson Celix::dfi ZMQ::lib CZMQ::lib GTest::gtest GTest::gtest_main)
-    target_include_directories(pubsub_zmq_zerocopy_tests SYSTEM PRIVATE test)
+    target_include_directories(pubsub_zmq_zerocopy_tests SYSTEM PRIVATE src)
 
     add_test(NAME pubsub_zmq_zerocopy_tests COMMAND pubsub_zmq_zerocopy_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_zmq_zerocopy_tests,CONTAINER_LOC>)
     setup_target_for_coverage(pubsub_zmq_zerocopy_tests SCAN_DIR ..)
 
     target_link_libraries(pubsub_zmq_v2_zerocopy_tests PRIVATE Celix::pubsub_api Celix::dfi ZMQ::lib CZMQ::lib GTest::gtest GTest::gtest_main)
-    target_include_directories(pubsub_zmq_v2_zerocopy_tests SYSTEM PRIVATE test)
+    target_include_directories(pubsub_zmq_v2_zerocopy_tests SYSTEM PRIVATE src)
 
     add_test(NAME pubsub_zmq_v2_zerocopy_tests COMMAND pubsub_zmq_v2_zerocopy_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_zmq_v2_zerocopy_tests,CONTAINER_LOC>)
     setup_target_for_coverage(pubsub_zmq_v2_zerocopy_tests SCAN_DIR ..)
 
     add_celix_container(pubsub_zmq_zerocopy_wire_v2_tests
         USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
-        LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+        LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/gtest/PubSubIntegrationTestSuite.cc
         DIR ${CMAKE_CURRENT_BINARY_DIR}
         PROPERTIES
         LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
@@ -690,7 +690,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
         pubsub_serializer
         )
     target_link_libraries(pubsub_zmq_zerocopy_wire_v2_tests PRIVATE Celix::pubsub_api Jansson Celix::dfi ZMQ::lib CZMQ::lib GTest::gtest GTest::gtest_main)
-    target_include_directories(pubsub_zmq_zerocopy_wire_v2_tests SYSTEM PRIVATE test)
+    target_include_directories(pubsub_zmq_zerocopy_wire_v2_tests SYSTEM PRIVATE src)
 
     add_test(NAME pubsub_zmq_zerocopy_wire_v2_tests COMMAND pubsub_zmq_zerocopy_wire_v2_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_zmq_zerocopy_wire_v2_tests,CONTAINER_LOC>)
     setup_target_for_coverage(pubsub_zmq_zerocopy_wire_v2_tests SCAN_DIR ..)
@@ -733,8 +733,8 @@ if (BUILD_PUBSUB_PSA_ZMQ)
     add_dependencies(pstm_deadlock_zmq_test pubsub_deadlock_sut_bundle)
 
     #Framework "bundle" has no cache dir. Default as "cache dir" the cwd is used.
-    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/msg.descriptor ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_zmq_test/META-INF/descriptors/msg.descriptor COPYONLY)
-    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/deadlock.scope.properties ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_zmq_test/META-INF/topics/pub/deadlock.properties COPYONLY)
+    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/msg.descriptor ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_zmq_src/META-INF/descriptors/msg.descriptor COPYONLY)
+    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/deadlock.scope.properties ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_zmq_src/META-INF/topics/pub/deadlock.properties COPYONLY)
 
     add_test(NAME pstm_deadlock_zmq_test COMMAND pstm_deadlock_zmq_test WORKING_DIRECTORY $<TARGET_PROPERTY:pstm_deadlock_zmq_test,CONTAINER_LOC>)
     setup_target_for_coverage(pstm_deadlock_zmq_test SCAN_DIR ..)
@@ -747,8 +747,8 @@ if (BUILD_PUBSUB_PSA_ZMQ)
     add_dependencies(pstm_deadlock_zmq_v2_test pubsub_deadlock_sut_bundle)
 
     #Framework "bundle" has no cache dir. Default as "cache dir" the cwd is used.
-    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/msg.descriptor ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_zmq_v2_test/META-INF/descriptors/msg.descriptor COPYONLY)
-    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/deadlock.scope.properties ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_zmq_v2_test/META-INF/topics/pub/deadlock.properties COPYONLY)
+    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/msg.descriptor ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_zmq_v2_src/META-INF/descriptors/msg.descriptor COPYONLY)
+    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/deadlock.scope.properties ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_zmq_v2_src/META-INF/topics/pub/deadlock.properties COPYONLY)
 
     add_test(NAME pstm_deadlock_zmq_v2_test COMMAND pstm_deadlock_zmq_v2_test WORKING_DIRECTORY $<TARGET_PROPERTY:pstm_deadlock_zmq_v2_test,CONTAINER_LOC>)
     setup_target_for_coverage(pstm_deadlock_zmq_v2_test SCAN_DIR ..)
@@ -758,7 +758,7 @@ endif ()
 if (BUILD_PUBSUB_PSA_ZMQ)
     #Test suite to test if component with same topic and different scope combinations work
     add_executable(test_pubsub_topic_and_scope_integration
-            topic_different_scope_test/PubSubTopicAndScopeIntegrationTestSuite.cc
+            gtest/PubSubTopicAndScopeIntegrationTestSuite.cc
     )
     target_link_libraries(test_pubsub_topic_and_scope_integration PRIVATE Celix::framework Celix::pubsub_api GTest::gtest GTest::gtest_main)
     setup_target_for_coverage(test_pubsub_topic_and_scope_integration SCAN_DIR ..)
diff --git a/bundles/pubsub/integration/test/test_endpoint_runner.cc b/bundles/pubsub/integration/gtest/PubSubEndpointIntegrationTestSuite.cc
similarity index 100%
rename from bundles/pubsub/integration/test/test_endpoint_runner.cc
rename to bundles/pubsub/integration/gtest/PubSubEndpointIntegrationTestSuite.cc
diff --git a/bundles/pubsub/integration/test/test_runner.cc b/bundles/pubsub/integration/gtest/PubSubIntegrationTestSuite.cc
similarity index 100%
rename from bundles/pubsub/integration/test/test_runner.cc
rename to bundles/pubsub/integration/gtest/PubSubIntegrationTestSuite.cc
diff --git a/bundles/pubsub/integration/topic_different_scope_test/PubSubTopicAndScopeIntegrationTestSuite.cc b/bundles/pubsub/integration/gtest/PubSubTopicAndScopeIntegrationTestSuite.cc
similarity index 100%
rename from bundles/pubsub/integration/topic_different_scope_test/PubSubTopicAndScopeIntegrationTestSuite.cc
rename to bundles/pubsub/integration/gtest/PubSubTopicAndScopeIntegrationTestSuite.cc
diff --git a/bundles/pubsub/integration/test/loopback_activator.c b/bundles/pubsub/integration/src/loopback_activator.c
similarity index 100%
rename from bundles/pubsub/integration/test/loopback_activator.c
rename to bundles/pubsub/integration/src/loopback_activator.c
diff --git a/bundles/pubsub/integration/test/msg.h b/bundles/pubsub/integration/src/msg.h
similarity index 100%
rename from bundles/pubsub/integration/test/msg.h
rename to bundles/pubsub/integration/src/msg.h
diff --git a/bundles/pubsub/integration/test/receive_count_service.h b/bundles/pubsub/integration/src/receive_count_service.h
similarity index 100%
rename from bundles/pubsub/integration/test/receive_count_service.h
rename to bundles/pubsub/integration/src/receive_count_service.h
diff --git a/bundles/pubsub/integration/test/serializer_activator.cc b/bundles/pubsub/integration/src/serializer_activator.cc
similarity index 100%
rename from bundles/pubsub/integration/test/serializer_activator.cc
rename to bundles/pubsub/integration/src/serializer_activator.cc
diff --git a/bundles/pubsub/integration/test/sut_activator.c b/bundles/pubsub/integration/src/sut_activator.c
similarity index 100%
rename from bundles/pubsub/integration/test/sut_activator.c
rename to bundles/pubsub/integration/src/sut_activator.c
diff --git a/bundles/pubsub/integration/test/sut_endpoint_activator.c b/bundles/pubsub/integration/src/sut_endpoint_activator.c
similarity index 100%
rename from bundles/pubsub/integration/test/sut_endpoint_activator.c
rename to bundles/pubsub/integration/src/sut_endpoint_activator.c
diff --git a/bundles/pubsub/integration/test/tst_activator.c b/bundles/pubsub/integration/src/tst_activator.c
similarity index 100%
rename from bundles/pubsub/integration/test/tst_activator.c
rename to bundles/pubsub/integration/src/tst_activator.c
diff --git a/bundles/pubsub/integration/test/tst_endpoint_activator.c b/bundles/pubsub/integration/src/tst_endpoint_activator.c
similarity index 100%
rename from bundles/pubsub/integration/test/tst_endpoint_activator.c
rename to bundles/pubsub/integration/src/tst_endpoint_activator.c

[celix] 01/07: renamed pubsub/test to pubsub/integration

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 2f0d2f89e9378b389a11bd762cfd6f83d8dfa659
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Fri Jun 25 18:56:06 2021 +0200

    renamed pubsub/test to pubsub/integration
---
 bundles/pubsub/CMakeLists.txt                                           | 2 +-
 bundles/pubsub/{test => integration}/CMakeLists.txt                     | 0
 .../pubsub/{test => integration}/meta_data/deadlock.scope.properties    | 0
 .../pubsub/{test => integration}/meta_data/deadlock.scope2.properties   | 0
 bundles/pubsub/{test => integration}/meta_data/msg.descriptor           | 0
 bundles/pubsub/{test => integration}/meta_data/ping.properties          | 0
 bundles/pubsub/{test => integration}/meta_data/ping2.properties         | 0
 bundles/pubsub/{test => integration}/meta_data/ping3.properties         | 0
 bundles/pubsub/{test => integration}/meta_data/pong2.properties         | 0
 bundles/pubsub/{test => integration}/meta_data/pong3.properties         | 0
 bundles/pubsub/{test => integration}/pstm_deadlock_test/test_runner.cc  | 0
 bundles/pubsub/{test => integration}/test/loopback_activator.c          | 0
 bundles/pubsub/{test => integration}/test/msg.h                         | 0
 bundles/pubsub/{test => integration}/test/receive_count_service.h       | 0
 bundles/pubsub/{test => integration}/test/serializer_activator.cc       | 0
 bundles/pubsub/{test => integration}/test/sut_activator.c               | 0
 bundles/pubsub/{test => integration}/test/sut_endpoint_activator.c      | 0
 bundles/pubsub/{test => integration}/test/test_endpoint_runner.cc       | 0
 bundles/pubsub/{test => integration}/test/test_runner.cc                | 0
 bundles/pubsub/{test => integration}/test/tst_activator.c               | 0
 bundles/pubsub/{test => integration}/test/tst_endpoint_activator.c      | 0
 .../PubSubTopicAndScopeIntegrationTestSuite.cc                          | 0
 22 files changed, 1 insertion(+), 1 deletion(-)

diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index 3a66b25..d04b80b 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -56,7 +56,7 @@ if (PUBSUB)
     add_subdirectory(examples)
 
     if (ENABLE_TESTING)
-        add_subdirectory(test)
+        add_subdirectory(integration)
     endif()
 
 endif(PUBSUB)
diff --git a/bundles/pubsub/test/CMakeLists.txt b/bundles/pubsub/integration/CMakeLists.txt
similarity index 100%
rename from bundles/pubsub/test/CMakeLists.txt
rename to bundles/pubsub/integration/CMakeLists.txt
diff --git a/bundles/pubsub/test/meta_data/deadlock.scope.properties b/bundles/pubsub/integration/meta_data/deadlock.scope.properties
similarity index 100%
rename from bundles/pubsub/test/meta_data/deadlock.scope.properties
rename to bundles/pubsub/integration/meta_data/deadlock.scope.properties
diff --git a/bundles/pubsub/test/meta_data/deadlock.scope2.properties b/bundles/pubsub/integration/meta_data/deadlock.scope2.properties
similarity index 100%
rename from bundles/pubsub/test/meta_data/deadlock.scope2.properties
rename to bundles/pubsub/integration/meta_data/deadlock.scope2.properties
diff --git a/bundles/pubsub/test/meta_data/msg.descriptor b/bundles/pubsub/integration/meta_data/msg.descriptor
similarity index 100%
rename from bundles/pubsub/test/meta_data/msg.descriptor
rename to bundles/pubsub/integration/meta_data/msg.descriptor
diff --git a/bundles/pubsub/test/meta_data/ping.properties b/bundles/pubsub/integration/meta_data/ping.properties
similarity index 100%
rename from bundles/pubsub/test/meta_data/ping.properties
rename to bundles/pubsub/integration/meta_data/ping.properties
diff --git a/bundles/pubsub/test/meta_data/ping2.properties b/bundles/pubsub/integration/meta_data/ping2.properties
similarity index 100%
rename from bundles/pubsub/test/meta_data/ping2.properties
rename to bundles/pubsub/integration/meta_data/ping2.properties
diff --git a/bundles/pubsub/test/meta_data/ping3.properties b/bundles/pubsub/integration/meta_data/ping3.properties
similarity index 100%
rename from bundles/pubsub/test/meta_data/ping3.properties
rename to bundles/pubsub/integration/meta_data/ping3.properties
diff --git a/bundles/pubsub/test/meta_data/pong2.properties b/bundles/pubsub/integration/meta_data/pong2.properties
similarity index 100%
rename from bundles/pubsub/test/meta_data/pong2.properties
rename to bundles/pubsub/integration/meta_data/pong2.properties
diff --git a/bundles/pubsub/test/meta_data/pong3.properties b/bundles/pubsub/integration/meta_data/pong3.properties
similarity index 100%
rename from bundles/pubsub/test/meta_data/pong3.properties
rename to bundles/pubsub/integration/meta_data/pong3.properties
diff --git a/bundles/pubsub/test/pstm_deadlock_test/test_runner.cc b/bundles/pubsub/integration/pstm_deadlock_test/test_runner.cc
similarity index 100%
rename from bundles/pubsub/test/pstm_deadlock_test/test_runner.cc
rename to bundles/pubsub/integration/pstm_deadlock_test/test_runner.cc
diff --git a/bundles/pubsub/test/test/loopback_activator.c b/bundles/pubsub/integration/test/loopback_activator.c
similarity index 100%
rename from bundles/pubsub/test/test/loopback_activator.c
rename to bundles/pubsub/integration/test/loopback_activator.c
diff --git a/bundles/pubsub/test/test/msg.h b/bundles/pubsub/integration/test/msg.h
similarity index 100%
rename from bundles/pubsub/test/test/msg.h
rename to bundles/pubsub/integration/test/msg.h
diff --git a/bundles/pubsub/test/test/receive_count_service.h b/bundles/pubsub/integration/test/receive_count_service.h
similarity index 100%
rename from bundles/pubsub/test/test/receive_count_service.h
rename to bundles/pubsub/integration/test/receive_count_service.h
diff --git a/bundles/pubsub/test/test/serializer_activator.cc b/bundles/pubsub/integration/test/serializer_activator.cc
similarity index 100%
rename from bundles/pubsub/test/test/serializer_activator.cc
rename to bundles/pubsub/integration/test/serializer_activator.cc
diff --git a/bundles/pubsub/test/test/sut_activator.c b/bundles/pubsub/integration/test/sut_activator.c
similarity index 100%
rename from bundles/pubsub/test/test/sut_activator.c
rename to bundles/pubsub/integration/test/sut_activator.c
diff --git a/bundles/pubsub/test/test/sut_endpoint_activator.c b/bundles/pubsub/integration/test/sut_endpoint_activator.c
similarity index 100%
rename from bundles/pubsub/test/test/sut_endpoint_activator.c
rename to bundles/pubsub/integration/test/sut_endpoint_activator.c
diff --git a/bundles/pubsub/test/test/test_endpoint_runner.cc b/bundles/pubsub/integration/test/test_endpoint_runner.cc
similarity index 100%
rename from bundles/pubsub/test/test/test_endpoint_runner.cc
rename to bundles/pubsub/integration/test/test_endpoint_runner.cc
diff --git a/bundles/pubsub/test/test/test_runner.cc b/bundles/pubsub/integration/test/test_runner.cc
similarity index 100%
rename from bundles/pubsub/test/test/test_runner.cc
rename to bundles/pubsub/integration/test/test_runner.cc
diff --git a/bundles/pubsub/test/test/tst_activator.c b/bundles/pubsub/integration/test/tst_activator.c
similarity index 100%
rename from bundles/pubsub/test/test/tst_activator.c
rename to bundles/pubsub/integration/test/tst_activator.c
diff --git a/bundles/pubsub/test/test/tst_endpoint_activator.c b/bundles/pubsub/integration/test/tst_endpoint_activator.c
similarity index 100%
rename from bundles/pubsub/test/test/tst_endpoint_activator.c
rename to bundles/pubsub/integration/test/tst_endpoint_activator.c
diff --git a/bundles/pubsub/test/topic_different_scope_test/PubSubTopicAndScopeIntegrationTestSuite.cc b/bundles/pubsub/integration/topic_different_scope_test/PubSubTopicAndScopeIntegrationTestSuite.cc
similarity index 100%
rename from bundles/pubsub/test/topic_different_scope_test/PubSubTopicAndScopeIntegrationTestSuite.cc
rename to bundles/pubsub/integration/topic_different_scope_test/PubSubTopicAndScopeIntegrationTestSuite.cc

[celix] 06/07: Refactors v2 of pubsub tcp and zmq to only call interceptors callback once per receive. renames old psa to _v1.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 461a2cea194c0bc203e03af38ce21a9a15d48182
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Sun Jun 27 22:36:30 2021 +0200

    Refactors v2 of pubsub tcp and zmq to only call interceptors callback once per receive. renames old psa to _v1.
---
 bundles/pubsub/CMakeLists.txt                      |   9 +-
 bundles/pubsub/integration/CMakeLists.txt          |   4 +-
 bundles/pubsub/integration/src/tst_activator.c     |   6 +-
 bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt  |  22 +--
 .../v1/src/pubsub_tcp_topic_receiver.c             |   6 +-
 .../v2/src/pubsub_tcp_topic_receiver.c             | 183 +++++++++------------
 .../pubsub_admin_websocket/v1/CMakeLists.txt       |  20 ++-
 .../v1/src/pubsub_websocket_topic_receiver.c       |   5 +-
 bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt  |  20 ++-
 .../v1/src/pubsub_zmq_topic_receiver.c             |  59 +------
 .../v2/src/pubsub_zmq_topic_receiver.c             | 172 +++++++++----------
 .../include/pubsub_interceptors_handler.h          |   2 +-
 .../pubsub_spi/src/pubsub_interceptors_handler.c   |  41 ++---
 13 files changed, 232 insertions(+), 317 deletions(-)

diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index 9285195..482ee3a 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -21,14 +21,16 @@ if (PUBSUB)
     option(BUILD_PUBSUB_PSA_ZMQ "Build ZeroMQ PubSub Admin (LGPL License)" ON)
     if (BUILD_PUBSUB_PSA_ZMQ)
         option(BUILD_ZMQ_SECURITY "Build with security for ZeroMQ." OFF)
-        add_subdirectory(pubsub_admin_zmq/v1)
+        add_subdirectory(pubsub_admin_zmq/v1) #TODO option for v1 admins
         add_subdirectory(pubsub_admin_zmq/v2)
+        add_library(Celix::pubsub_admin_zmq ALIAS celix_pubsub_admin_zmq_v1) #TODO move to config and set to v2
     endif (BUILD_PUBSUB_PSA_ZMQ)
 
     option(BUILD_PUBSUB_PSA_TCP "Build TCP PubSub Admin" ON)
     if (BUILD_PUBSUB_PSA_TCP)
-        add_subdirectory(pubsub_admin_tcp/v1)
+        add_subdirectory(pubsub_admin_tcp/v1) #TODO option for v1 admins
         add_subdirectory(pubsub_admin_tcp/v2)
+        add_library(Celix::pubsub_admin_tcp ALIAS celix_pubsub_admin_tcp_v1) #TODO move to config and set to v2
     endif (BUILD_PUBSUB_PSA_TCP)
 
     option(BUILD_PUBSUB_PSA_UDP_MC "Build UDP MC PubSub Admin" ON)
@@ -38,8 +40,9 @@ if (PUBSUB)
 
     option(BUILD_PUBSUB_PSA_WS "Build WebSocket PubSub Admin" ON)
     if (BUILD_PUBSUB_PSA_WS)
-        add_subdirectory(pubsub_admin_websocket/v1)
+        add_subdirectory(pubsub_admin_websocket/v1) #TODO option for v1 admins
         add_subdirectory(pubsub_admin_websocket/v2)
+        add_library(Celix::pubsub_admin_websocket ALIAS celix_pubsub_admin_websocket_v1) #TODO move to config and set to v2
     endif (BUILD_PUBSUB_PSA_WS)
 
     add_subdirectory(pubsub_api)
diff --git a/bundles/pubsub/integration/CMakeLists.txt b/bundles/pubsub/integration/CMakeLists.txt
index a134b74..9c9ce54 100644
--- a/bundles/pubsub/integration/CMakeLists.txt
+++ b/bundles/pubsub/integration/CMakeLists.txt
@@ -793,11 +793,11 @@ if (BUILD_PUBSUB_PSA_ZMQ)
     #configure topology manager and pubsub zmq, json serializer and wire protocol v2 bundles
     celix_get_bundle_file(Celix::pubsub_serializer_json PUBSUB_JSON_BUNDLE_FILE)
     celix_get_bundle_file(Celix::pubsub_topology_manager PUBSUB_TOPMAN_BUNDLE_FILE)
-    celix_get_bundle_file(Celix::pubsub_admin_zmq PUBSUB_ZMQ_BUNDLE_FILE)
+    celix_get_bundle_file(Celix::pubsub_admin_zmq_v2 PUBSUB_ZMQ_BUNDLE_FILE)
     celix_get_bundle_file(Celix::pubsub_protocol_wire_v1 PUBSUB_WIRE_BUNDLE_FILE)
     celix_get_bundle_file(pubsub_sut PUBSUB_PUBLISHER_BUNDLE_FILE)
     celix_get_bundle_file(pubsub_tst PUBSUB_SUBSCRIBER_BUNDLE_FILE)
-    add_celix_bundle_dependencies(test_pubsub_interceptors_integration Celix::pubsub_serializer_json Celix::pubsub_topology_manager Celix::pubsub_admin_zmq Celix::pubsub_protocol_wire_v1 pubsub_sut pubsub_tst)
+    add_celix_bundle_dependencies(test_pubsub_interceptors_integration Celix::pubsub_serializer_json Celix::pubsub_topology_manager Celix::pubsub_admin_zmq_v2 Celix::pubsub_protocol_wire_v1 pubsub_sut pubsub_tst)
     target_compile_definitions(test_pubsub_interceptors_integration PRIVATE
             PUBSUB_JSON_BUNDLE_FILE="${PUBSUB_JSON_BUNDLE_FILE}"
             PUBSUB_TOPMAN_BUNDLE_FILE="${PUBSUB_TOPMAN_BUNDLE_FILE}"
diff --git a/bundles/pubsub/integration/src/tst_activator.c b/bundles/pubsub/integration/src/tst_activator.c
index 8ce13ac..4017326 100644
--- a/bundles/pubsub/integration/src/tst_activator.c
+++ b/bundles/pubsub/integration/src/tst_activator.c
@@ -86,7 +86,7 @@ celix_status_t bnd_stop(struct activator *act, celix_bundle_context_t *ctx) {
 CELIX_GEN_BUNDLE_ACTIVATOR(struct activator, bnd_start, bnd_stop) ;
 
 
-static int tst_receive(void *handle, const char * msgType __attribute__((unused)), unsigned int msgTypeId  __attribute__((unused)), void * voidMsg, const celix_properties_t *metadata  __attribute__((unused)), bool *release  __attribute__((unused))) {
+static int tst_receive(void *handle, const char * msgType __attribute__((unused)), unsigned int msgTypeId  __attribute__((unused)), void * voidMsg, const celix_properties_t *metadata  __attribute__((unused)), bool *release) {
     struct activator *act = handle;
 
     msg_t *msg = voidMsg;
@@ -100,6 +100,10 @@ static int tst_receive(void *handle, const char * msgType __attribute__((unused)
     pthread_mutex_lock(&act->mutex);
     act->count1 += 1;
     pthread_mutex_unlock(&act->mutex);
+
+    *release = false;
+    free(voidMsg);
+
     return CELIX_SUCCESS;
 }
 
diff --git a/bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt b/bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt
index 0314cb6..2f1bca8 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt
@@ -15,10 +15,12 @@
 # specific language governing permissions and limitations
 # under the License.
 
+message(WARNING "PubSub TCP Admin V1 is deprecated, use PubSub TCP Websocket v2 instead")
+
 find_package(UUID REQUIRED)
 
-add_celix_bundle(celix_pubsub_admin_tcp
-    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_tcp"
+add_celix_bundle(celix_pubsub_admin_tcp_v1
+    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_tcp_v1"
     VERSION "1.0.0"
     GROUP "Celix/PubSub"
     SOURCES
@@ -30,15 +32,15 @@ add_celix_bundle(celix_pubsub_admin_tcp
         src/pubsub_tcp_common.c
 )
 
-set_target_properties(celix_pubsub_admin_tcp PROPERTIES INSTALL_RPATH "$ORIGIN")
-target_link_libraries(celix_pubsub_admin_tcp PRIVATE Celix::pubsub_spi Celix::pubsub_utils)
-target_link_libraries(celix_pubsub_admin_tcp PRIVATE Celix::framework Celix::dfi Celix::log_helper)
-target_include_directories(celix_pubsub_admin_tcp PRIVATE src)
+set_target_properties(celix_pubsub_admin_tcp_v1 PROPERTIES INSTALL_RPATH "$ORIGIN")
+target_link_libraries(celix_pubsub_admin_tcp_v1 PRIVATE Celix::pubsub_spi Celix::pubsub_utils)
+target_link_libraries(celix_pubsub_admin_tcp_v1 PRIVATE Celix::framework Celix::dfi Celix::log_helper)
+target_include_directories(celix_pubsub_admin_tcp_v1 PRIVATE src)
 # cmake find package UUID set the wrong include dir for OSX
 if (NOT APPLE)
-    target_link_libraries(celix_pubsub_admin_tcp PRIVATE UUID::lib)
+    target_link_libraries(celix_pubsub_admin_tcp_v1 PRIVATE UUID::lib)
 endif()
 
-install_celix_bundle(celix_pubsub_admin_tcp EXPORT celix COMPONENT pubsub)
-target_link_libraries(celix_pubsub_admin_tcp PRIVATE Celix::shell_api)
-add_library(Celix::pubsub_admin_tcp ALIAS celix_pubsub_admin_tcp)
+install_celix_bundle(celix_pubsub_admin_tcp_v1 EXPORT celix COMPONENT pubsub)
+target_link_libraries(celix_pubsub_admin_tcp_v1 PRIVATE Celix::shell_api)
+add_library(Celix::pubsub_admin_tcp_v1 ALIAS celix_pubsub_admin_tcp_v1)
diff --git a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c
index b1b2c58..8acb8e2 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c
@@ -560,9 +560,8 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
                         pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
                         svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata, &release);
                         pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata);
-                        if (!release && hashMapIterator_hasNext(&iter)) {
-                            //receive function has taken ownership and still more receive function to come ..
-                            //deserialize again for new message
+                        if (!release) {
+                            //receive function has taken ownership, deserialize again for new message
                             status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 1, &deSerializedMsg);
                             if (status != CELIX_SUCCESS) {
                                 L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s",
@@ -574,6 +573,7 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
                             release = true;
                         }
                     }
+                    pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata);
                     if (release) {
                         msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
                     }
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
index a49ff23..ad321e8 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
@@ -79,7 +79,7 @@ struct pubsub_tcp_topic_receiver {
     long subscriberTrackerId;
     struct {
         celix_thread_mutex_t mutex;
-        hash_map_t *map; //key = bnd id, value = psa_tcp_subscriber_entry_t
+        hash_map_t *map; //key = long svc id, value = psa_tcp_subscriber_entry_t
         bool allInitialized;
     } subscribers;
 };
@@ -92,12 +92,12 @@ typedef struct psa_tcp_requested_connection_entry {
 } psa_tcp_requested_connection_entry_t;
 
 typedef struct psa_tcp_subscriber_entry {
-    hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
+    pubsub_subscriber_t* subscriberSvc;
     bool initialized; //true if the init function is called through the receive thread
 } psa_tcp_subscriber_entry_t;
 
-static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
-static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
+static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props);
+static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props);
 static void *psa_tcp_recvThread(void *data);
 static void psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t *receiver);
 static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiver);
@@ -229,8 +229,8 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
         opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
         opts.filter.filter = buf;
         opts.callbackHandle = receiver;
-        opts.addWithOwner = pubsub_tcpTopicReceiver_addSubscriber;
-        opts.removeWithOwner = pubsub_tcpTopicReceiver_removeSubscriber;
+        opts.addWithProperties = pubsub_tcpTopicReceiver_addSubscriber;
+        opts.removeWithProperties = pubsub_tcpTopicReceiver_removeSubscriber;
         receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
     }
 
@@ -259,19 +259,11 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
         celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId);
 
         celixThreadMutex_lock(&receiver->subscribers.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
-            if (entry != NULL) {
-                hashMap_destroy(entry->subscriberServices, false, false);
-                free(entry);
-            }
-        }
-        hashMap_destroy(receiver->subscribers.map, false, false);
+        hashMap_destroy(receiver->subscribers.map, false, true);
         celixThreadMutex_unlock(&receiver->subscribers.mutex);
 
         celixThreadMutex_lock(&receiver->requestedConnections.mutex);
-        iter = hashMapIterator_construct(receiver->requestedConnections.map);
+        hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
         while (hashMapIterator_hasNext(&iter)) {
             psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (entry != NULL) {
@@ -394,11 +386,9 @@ void pubsub_tcpTopicReceiver_disconnectFrom(pubsub_tcp_topic_receiver_t *receive
     celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
 }
 
-static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props,
-                                                  const celix_bundle_t *bnd) {
+static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props) {
     pubsub_tcp_topic_receiver_t *receiver = handle;
 
-    long bndId = celix_bundle_getId(bnd);
     long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
     if (receiver->scope == NULL) {
@@ -415,62 +405,78 @@ static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const
         return;
     }
 
-    celixThreadMutex_lock(&receiver->subscribers.mutex);
-    psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *)bndId);
-    if (entry != NULL) {
-        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
-    } else {
-        //new create entry
-        entry = calloc(1, sizeof(*entry));
-        entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
-        entry->initialized = false;
-        receiver->subscribers.allInitialized = false;
+    psa_tcp_subscriber_entry_t *entry = calloc(1, sizeof(*entry));
+    entry->subscriberSvc = svc;
+    entry->initialized = false;
 
-        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
-        hashMap_put(receiver->subscribers.map, (void *)bndId, entry);
-    }
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    hashMap_put(receiver->subscribers.map, (void*)svcId, entry);
+    receiver->subscribers.allInitialized = false;
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
 
-static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props,
-                                                     const celix_bundle_t *bnd) {
+static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props) {
     pubsub_tcp_topic_receiver_t *receiver = handle;
 
-    long bndId = celix_bundle_getId(bnd);
     long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
 
     celixThreadMutex_lock(&receiver->subscribers.mutex);
-    psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId);
-    if (entry != NULL) {
-        hashMap_remove(entry->subscriberServices, (void*)svcId);
-    }
-    if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
-        //remove entry
-        hashMap_remove(receiver->subscribers.map, (void *) bndId);
-        hashMap_destroy(entry->subscriberServices, false, false);
-        free(entry);
+    psa_tcp_subscriber_entry_t *entry = hashMap_remove(receiver->subscribers.map, (void *)svcId);
+    free(entry);
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
+
+static void callReceivers(pubsub_tcp_topic_receiver_t *receiver, const char* msgFqn, const pubsub_protocol_message_t *message, void** msg, bool* release, const celix_properties_t* metadata) {
+    *release = true;
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_tcp_subscriber_entry_t* entry = hashMapIterator_nextValue(&iter);
+        if (entry != NULL && entry->subscriberSvc->receive != NULL) {
+            entry->subscriberSvc->receive(entry->subscriberSvc->handle, msgFqn, message->header.msgId, *msg, metadata, release);
+            if (!(*release)) {
+                //receive function has taken ownership, deserialize again for new message
+                struct iovec deSerializeBuffer;
+                deSerializeBuffer.iov_base = message->payload.payload;
+                deSerializeBuffer.iov_len = message->payload.length;
+                celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler,
+                                                                             message->header.msgId,
+                                                                             message->header.msgMajorVersion,
+                                                                             message->header.msgMinorVersion,
+                                                                             &deSerializeBuffer, 0, msg);
+                if (status != CELIX_SUCCESS) {
+                    L_WARN("[PSA_TCO_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
+                           receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+                    break;
+                }
+            }
+            *release = true;
+        }
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
 
-static inline void
-processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subscriber_entry_t *entry,
-                             const pubsub_protocol_message_t *message, bool *releaseMsg, struct timespec *receiveTime __attribute__((unused))) {
-    //NOTE receiver->subscribers.mutex locked
+static inline void processMsg(void* handle, const pubsub_protocol_message_t *message, bool* releaseMsg, struct timespec *receiveTime) {
+    pubsub_tcp_topic_receiver_t *receiver = handle;
 
-    const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
+    const char *msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
     if (msgFqn == NULL) {
         L_WARN("Cannot find msg fqn for msg id %u", message->header.msgId);
         return;
     }
-
     void *deSerializedMsg = NULL;
-    bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion);
+    bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId,
+                                                                    message->header.msgMajorVersion,
+                                                                    message->header.msgMinorVersion);
     if (validVersion) {
         struct iovec deSerializeBuffer;
         deSerializeBuffer.iov_base = message->payload.payload;
         deSerializeBuffer.iov_len = message->payload.length;
-        celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 1, &deSerializedMsg);
+        celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId,
+                                                                     message->header.msgMajorVersion,
+                                                                     message->header.msgMinorVersion,
+                                                                     &deSerializeBuffer, 0, &deSerializedMsg);
+
         // When received payload pointer is the same as deserializedMsg, set ownership of pointer to topic receiver
         if (message->payload.payload == deSerializedMsg) {
             *releaseMsg = true;
@@ -479,33 +485,15 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
         if (status == CELIX_SUCCESS) {
             uint32_t msgId = message->header.msgId;
             celix_properties_t *metadata = message->metadata.metadata;
-            bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, &metadata);
-            bool release = true;
+            bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId,
+                                                                  deSerializedMsg, &metadata);
             if (cont) {
-                hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
-                while (hashMapIterator_hasNext(&iter)) {
-                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
-                    svc->receive(svc->handle, msgFqn, msgId, deSerializedMsg, message->metadata.metadata, &release);
-                    pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, metadata);
-                    if (!release && hashMapIterator_hasNext(&iter)) {
-                        //receive function has taken ownership and still more receive function to come ..
-                        //deserialize again for new message
-                        status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 1, &deSerializedMsg);
-                        if (status != CELIX_SUCCESS) {
-                            L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s",
-                                   msgFqn,
-                                   receiver->scope == NULL ? "(null)" : receiver->scope,
-                                   receiver->topic);
-                            break;
-                        }
-                        release = true;
-                    }
-                }
+                bool release;
+                callReceivers(receiver, msgFqn, message, &deSerializedMsg, &release, metadata);
+                pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, metadata);
                 if (release) {
-                    pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deSerializedMsg);
-                }
-                if (message->metadata.metadata) {
-                    celix_properties_destroy(message->metadata.metadata);
+                    pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId,
+                                                                 deSerializedMsg);
                 }
             }
         } else {
@@ -516,27 +504,13 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
         L_WARN("[PSA_TCP_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version local: %i.%i.x",
                msgFqn,
                pubsub_serializerHandler_getSerializationType(receiver->serializerHandler),
-               (int)message->header.msgMajorVersion,
-               (int)message->header.msgMinorVersion,
+               (int) message->header.msgMajorVersion,
+               (int) message->header.msgMinorVersion,
                pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, message->header.msgId),
                pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, message->header.msgId));
     }
 }
 
-static void
-processMsg(void *handle, const pubsub_protocol_message_t *message, bool *release, struct timespec *receiveTime) {
-    pubsub_tcp_topic_receiver_t *receiver = handle;
-    celixThreadMutex_lock(&receiver->subscribers.mutex);
-    hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
-        if (entry != NULL) {
-            processMsgForSubscriberEntry(receiver, entry, message, release, receiveTime);
-        }
-    }
-    celixThreadMutex_unlock(&receiver->subscribers.mutex);
-}
-
 static void *psa_tcp_recvThread(void *data) {
     pubsub_tcp_topic_receiver_t *receiver = data;
 
@@ -642,20 +616,15 @@ static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiv
         while (hashMapIterator_hasNext(&iter)) {
             psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (!entry->initialized) {
-                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
-                while (hashMapIterator_hasNext(&iter2)) {
-                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
-                    int rc = 0;
-                    if (svc != NULL && svc->init != NULL) {
-                        rc = svc->init(svc->handle);
-                    }
-                    if (rc == 0) {
-                        //note now only initialized on first subscriber entries added.
-                        entry->initialized = true;
-                    } else {
-                        L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
-                        allInitialized = false;
-                    }
+                int rc = 0;
+                if (entry->subscriberSvc->init != NULL) {
+                    rc = entry->subscriberSvc->init(entry->subscriberSvc->handle);
+                }
+                if (rc == 0) {
+                    entry->initialized = true;
+                } else {
+                    L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+                    allInitialized = false;
                 }
             }
         }
diff --git a/bundles/pubsub/pubsub_admin_websocket/v1/CMakeLists.txt b/bundles/pubsub/pubsub_admin_websocket/v1/CMakeLists.txt
index f6397e0..021310f 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v1/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_websocket/v1/CMakeLists.txt
@@ -15,11 +15,13 @@
 # specific language governing permissions and limitations
 # under the License.
 
+message(WARNING "PubSub Websocket Admin V1 is deprecated, use PubSub ZMQ Websocket v2 instead")
+
 find_package(Jansson REQUIRED)
 find_package(UUID REQUIRED)
 
-add_celix_bundle(celix_pubsub_admin_websocket
-    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_websocket"
+add_celix_bundle(celix_pubsub_admin_websocket_v1
+    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_websocket_v1"
     VERSION "1.0.0"
     GROUP "Celix/PubSub"
     SOURCES
@@ -30,16 +32,16 @@ add_celix_bundle(celix_pubsub_admin_websocket
         src/pubsub_websocket_common.c
 )
 
-set_target_properties(celix_pubsub_admin_websocket PROPERTIES INSTALL_RPATH "$ORIGIN")
-target_link_libraries(celix_pubsub_admin_websocket PRIVATE
+set_target_properties(celix_pubsub_admin_websocket_v1 PROPERTIES INSTALL_RPATH "$ORIGIN")
+target_link_libraries(celix_pubsub_admin_websocket_v1 PRIVATE
         Celix::framework Celix::dfi Celix::log_helper Celix::utils
         Celix::http_admin_api
 )
-target_link_libraries(celix_pubsub_admin_websocket PRIVATE Celix::pubsub_spi Celix::pubsub_utils )
-target_include_directories(celix_pubsub_admin_websocket PRIVATE
+target_link_libraries(celix_pubsub_admin_websocket_v1 PRIVATE Celix::pubsub_spi Celix::pubsub_utils )
+target_include_directories(celix_pubsub_admin_websocket_v1 PRIVATE
     src
 )
 
-install_celix_bundle(celix_pubsub_admin_websocket EXPORT celix COMPONENT pubsub)
-target_link_libraries(celix_pubsub_admin_websocket PRIVATE Celix::shell_api)
-add_library(Celix::pubsub_admin_websocket ALIAS celix_pubsub_admin_websocket)
+install_celix_bundle(celix_pubsub_admin_websocket_v1 EXPORT celix COMPONENT pubsub)
+target_link_libraries(celix_pubsub_admin_websocket_v1 PRIVATE Celix::shell_api)
+add_library(Celix::pubsub_admin_websocket_v1 ALIAS celix_pubsub_admin_websocket_v1)
diff --git a/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_receiver.c b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_receiver.c
index 7d8cd00..7b2cfcc 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_receiver.c
@@ -500,9 +500,8 @@ static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_
                 while (hashMapIterator_hasNext(&iter)) {
                     pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
                     svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, NULL, &release);
-                    if (!release && hashMapIterator_hasNext(&iter)) {
-                        //receive function has taken ownership and still more receive function to come ..
-                        //deserialize again for new message
+                    if (!release) {
+                        //receive function has taken ownership, deserialize again for new message
                         status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &deSerializedMsg);
                         if (status != CELIX_SUCCESS) {
                             L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt b/bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt
index 52723b9..bfd8eef 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt
@@ -15,6 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
+message(WARNING "PubSub ZMQ Admin V1 is deprecated, use PubSub ZMQ Admin v2 instead")
+
 find_package(ZMQ REQUIRED)
 find_package(CZMQ REQUIRED)
 find_package(Jansson REQUIRED)
@@ -31,8 +33,8 @@ if (BUILD_ZMQ_SECURITY)
     set (ZMQ_CRYPTO_C "src/zmq_crypto.c")
 endif()
 
-add_celix_bundle(celix_pubsub_admin_zmq
-    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_zmq"
+add_celix_bundle(celix_pubsub_admin_zmq_v1
+    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_zmq_v1"
     VERSION "1.1.0"
     GROUP "Celix/PubSub"
     SOURCES
@@ -43,17 +45,17 @@ add_celix_bundle(celix_pubsub_admin_zmq
         ${ZMQ_CRYPTO_C}
 )
 
-set_target_properties(celix_pubsub_admin_zmq PROPERTIES INSTALL_RPATH "$ORIGIN")
-target_link_libraries(celix_pubsub_admin_zmq PRIVATE
+set_target_properties(celix_pubsub_admin_zmq_v1 PROPERTIES INSTALL_RPATH "$ORIGIN")
+target_link_libraries(celix_pubsub_admin_zmq_v1 PRIVATE
 
         Celix::framework Celix::dfi Celix::log_helper Celix::utils
         ZMQ::lib CZMQ::lib ${OPTIONAL_OPENSSL_LIB}
 )
-target_link_libraries(celix_pubsub_admin_zmq PRIVATE Celix::pubsub_spi Celix::pubsub_utils )
-target_include_directories(celix_pubsub_admin_zmq PRIVATE
+target_link_libraries(celix_pubsub_admin_zmq_v1 PRIVATE Celix::pubsub_spi Celix::pubsub_utils )
+target_include_directories(celix_pubsub_admin_zmq_v1 PRIVATE
     src
 )
 
-install_celix_bundle(celix_pubsub_admin_zmq EXPORT celix COMPONENT pubsub)
-target_link_libraries(celix_pubsub_admin_zmq PRIVATE Celix::shell_api)
-add_library(Celix::pubsub_admin_zmq ALIAS celix_pubsub_admin_zmq)
+install_celix_bundle(celix_pubsub_admin_zmq_v1 EXPORT celix COMPONENT pubsub)
+target_link_libraries(celix_pubsub_admin_zmq_v1 PRIVATE Celix::shell_api)
+add_library(Celix::celix_pubsub_admin_zmq_v1 ALIAS celix_pubsub_admin_zmq_v1)
diff --git a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c
index 11f2b8f..9cd99f4 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c
@@ -533,10 +533,8 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
                     while (hashMapIterator_hasNext(&iter2)) {
                         pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
                         svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, metadata, &release);
-                        pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata);
-                        if (!release && hashMapIterator_hasNext(&iter2)) {
-                            //receive function has taken ownership and still more receive function to come ..
-                            //deserialize again for new message
+                        if (!release) {
+                            //receive function has taken ownership deserialize again for new message
                             status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &deserializedMsg);
                             if (status != CELIX_SUCCESS) {
                                 L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
@@ -544,6 +542,7 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
                             }
                             release = true;
                         }
+                        pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata);
                     }
                     if (release) {
                         msgSer->freeDeserializeMsg(msgSer->handle, deserializedMsg);
@@ -558,58 +557,6 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
     } else {
         L_WARN("[PSA_ZMQ_TR] Cannot find serializer for type id 0x%X", message->header.msgId);
     }
-
-    if (msgSer != NULL && monitor) {
-        // TODO disabled for now, should move to an interceptor?
-//        hash_map_t *origins = hashMap_get(entry->metrics, (void*)(uintptr_t )message->header.msgId);
-//        char uuidStr[UUID_STR_LEN+1];
-//        uuid_unparse(hdr->originUUID, uuidStr);
-//        psa_zmq_subscriber_metrics_entry_t *metrics = hashMap_get(origins, uuidStr);
-//
-//        if (metrics == NULL) {
-//            metrics = calloc(1, sizeof(*metrics));
-//            hashMap_put(origins, strndup(uuidStr, UUID_STR_LEN+1), metrics);
-//            uuid_copy(metrics->origin, hdr->originUUID);
-//            metrics->msgTypeId = hdr->type;
-//            metrics->maxDelayInSeconds = -INFINITY;
-//            metrics->minDelayInSeconds = INFINITY;
-//            metrics->lastSeqNr = 0;
-//        }
-//
-//        double diff = celix_difftime(&beginSer, &endSer);
-//        long n = metrics->nrOfMessagesReceived;
-//        metrics->averageSerializationTimeInSeconds = (metrics->averageSerializationTimeInSeconds * n + diff) / (n+1);
-//
-//        diff = celix_difftime(&metrics->lastMessageReceived, receiveTime);
-//        n = metrics->nrOfMessagesReceived;
-//        if (metrics->nrOfMessagesReceived >= 1) {
-//            metrics->averageTimeBetweenMessagesInSeconds = (metrics->averageTimeBetweenMessagesInSeconds * n + diff) / (n + 1);
-//        }
-//        metrics->lastMessageReceived = *receiveTime;
-//
-//
-//        int incr = hdr->seqNr - metrics->lastSeqNr;
-//        if (metrics->lastSeqNr >0 && incr > 1) {
-//            metrics->nrOfMissingSeqNumbers += (incr - 1);
-//            L_WARN("Missing message seq nr went from %i to %i", metrics->lastSeqNr, hdr->seqNr);
-//        }
-//        metrics->lastSeqNr = hdr->seqNr;
-//
-//        struct timespec sendTime;
-//        sendTime.tv_sec = (time_t)hdr->sendtimeSeconds;
-//        sendTime.tv_nsec = (long)hdr->sendTimeNanoseconds; //TODO FIXME the tv_nsec is not correct
-//        diff = celix_difftime(&sendTime, receiveTime);
-//        metrics->averageDelayInSeconds = (metrics->averageDelayInSeconds * n + diff) / (n+1);
-//        if (diff < metrics->minDelayInSeconds) {
-//            metrics->minDelayInSeconds = diff;
-//        }
-//        if (diff > metrics->maxDelayInSeconds) {
-//            metrics->maxDelayInSeconds = diff;
-//        }
-//
-//        metrics->nrOfMessagesReceived += updateReceiveCount;
-//        metrics->nrOfSerializationErrors += updateSerError;
-    }
 }
 
 static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
index 90e9510..0fc2bc2 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
@@ -93,7 +93,7 @@ struct pubsub_zmq_topic_receiver {
     long subscriberTrackerId;
     struct {
         celix_thread_mutex_t mutex;
-        hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
+        hash_map_t *map; //key = long svc id, value = psa_zmq_subscriber_entry_t
         bool allInitialized;
     } subscribers;
 };
@@ -105,13 +105,13 @@ typedef struct psa_zmq_requested_connection_entry {
 } psa_zmq_requested_connection_entry_t;
 
 typedef struct psa_zmq_subscriber_entry {
-    hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
+    pubsub_subscriber_t* subscriberSvc;
     bool initialized; //true if the init function is called through the receive thread
 } psa_zmq_subscriber_entry_t;
 
 
-static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
-static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
+static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props);
+static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props);
 static void* psa_zmq_recvThread(void * data);
 static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t *receiver);
 static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiver);
@@ -237,8 +237,8 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
         opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
         opts.filter.filter = buf;
         opts.callbackHandle = receiver;
-        opts.addWithOwner = pubsub_zmqTopicReceiver_addSubscriber;
-        opts.removeWithOwner = pubsub_zmqTopicReceiver_removeSubscriber;
+        opts.addWithProperties = pubsub_zmqTopicReceiver_addSubscriber;
+        opts.removeWithProperties = pubsub_zmqTopicReceiver_removeSubscriber;
 
         receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
     }
@@ -275,19 +275,11 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) {
         celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId);
 
         celixThreadMutex_lock(&receiver->subscribers.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
-            if (entry != NULL)  {
-                hashMap_destroy(entry->subscriberServices, false, false);
-                free(entry);
-            }
-        }
-        hashMap_destroy(receiver->subscribers.map, false, false);
+        hashMap_destroy(receiver->subscribers.map, false, true);
         celixThreadMutex_unlock(&receiver->subscribers.mutex);
 
         celixThreadMutex_lock(&receiver->requestedConnections.mutex);
-        iter = hashMapIterator_construct(receiver->requestedConnections.map);
+        hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
         while (hashMapIterator_hasNext(&iter)) {
             psa_zmq_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (entry != NULL) {
@@ -384,10 +376,9 @@ void pubsub_zmqTopicReceiver_disconnectFrom(pubsub_zmq_topic_receiver_t *receive
     celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
 }
 
-static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
+static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props) {
     pubsub_zmq_topic_receiver_t *receiver = handle;
 
-    long bndId = celix_bundle_getId(bnd);
     long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
     if (receiver->scope == NULL) {
@@ -404,107 +395,104 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
         return;
     }
 
+    psa_zmq_subscriber_entry_t *entry = calloc(1, sizeof(*entry));
+    entry->subscriberSvc = svc;
+    entry->initialized = false;
+
     celixThreadMutex_lock(&receiver->subscribers.mutex);
-    psa_zmq_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
-    if (entry != NULL) {
-        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
-    } else {
-        //new create entry
-        entry = calloc(1, sizeof(*entry));
-        entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
-        entry->initialized = false;
-        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
-        hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
-    }
+    hashMap_put(receiver->subscribers.map, (void*)svcId, entry);
+    receiver->subscribers.allInitialized = false;
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
 
-static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
+static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props) {
     pubsub_zmq_topic_receiver_t *receiver = handle;
 
-    long bndId = celix_bundle_getId(bnd);
     long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
 
     celixThreadMutex_lock(&receiver->subscribers.mutex);
-    psa_zmq_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
-    if (entry != NULL) {
-        hashMap_remove(entry->subscriberServices, (void*)svcId);
-    }
-    if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
-        //remove entry
-        hashMap_remove(receiver->subscribers.map, (void*)bndId);
-        hashMap_destroy(entry->subscriberServices, false, false);
-        free(entry);
+    psa_zmq_subscriber_entry_t *entry = hashMap_remove(receiver->subscribers.map, (void*)svcId);
+    free(entry);
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
+
+static void callReceivers(pubsub_zmq_topic_receiver_t *receiver, const char* msgFqn, const pubsub_protocol_message_t *message, void** msg, bool* release, const celix_properties_t* metadata) {
+    *release = true;
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_zmq_subscriber_entry_t* entry = hashMapIterator_nextValue(&iter);
+        if (entry != NULL && entry->subscriberSvc->receive != NULL) {
+            entry->subscriberSvc->receive(entry->subscriberSvc->handle, msgFqn, message->header.msgId, *msg, metadata, release);
+            if (!(*release)) {
+                //receive function has taken ownership, deserialize again for new message
+                struct iovec deSerializeBuffer;
+                deSerializeBuffer.iov_base = message->payload.payload;
+                deSerializeBuffer.iov_len = message->payload.length;
+                celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler,
+                                                             message->header.msgId,
+                                                             message->header.msgMajorVersion,
+                                                             message->header.msgMinorVersion,
+                                                              &deSerializeBuffer, 0, msg);
+                if (status != CELIX_SUCCESS) {
+                    L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
+                           receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+                    break;
+                }
+            }
+            *release = true;
+        }
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
 
-static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *receiver, psa_zmq_subscriber_entry_t* entry, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
-    const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
+static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
+    const char *msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
     if (msgFqn == NULL) {
         L_WARN("Cannot find msg fqn for msg id %u", message->header.msgId);
         return;
     }
-
     void *deserializedMsg = NULL;
-    bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion);
+    bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId,
+                                                                    message->header.msgMajorVersion,
+                                                                    message->header.msgMinorVersion);
     if (validVersion) {
         struct iovec deSerializeBuffer;
         deSerializeBuffer.iov_base = message->payload.payload;
-        deSerializeBuffer.iov_len  = message->payload.length;
-        celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 0, &deserializedMsg);
+        deSerializeBuffer.iov_len = message->payload.length;
+        celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId,
+                                                                     message->header.msgMajorVersion,
+                                                                     message->header.msgMinorVersion,
+                                                                     &deSerializeBuffer, 0, &deserializedMsg);
         if (status == CELIX_SUCCESS) {
             uint32_t msgId = message->header.msgId;
             celix_properties_t *metadata = message->metadata.metadata;
-            bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, &metadata);
-            bool release = true;
+            bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId,
+                                                                  deserializedMsg, &metadata);
             if (cont) {
-                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
-                while (hashMapIterator_hasNext(&iter2)) {
-                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
-                    svc->receive(svc->handle, msgFqn, message->header.msgId, deserializedMsg, metadata, &release);
-                    pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, metadata);
-                    if (!release && hashMapIterator_hasNext(&iter2)) {
-                        //receive function has taken ownership and still more receive function to come ..
-                        //deserialize again for new message
-                        status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 0, &deserializedMsg);
-                        if (status != CELIX_SUCCESS) {
-                            L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
-                            break;
-                        }
-                        release = true;
-                    }
-                }
+                bool release;
+                callReceivers(receiver, msgFqn, message, &deserializedMsg, &release, metadata);
+                pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, metadata);
                 if (release) {
-                    pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deserializedMsg);
+                    pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId,
+                                                                 deserializedMsg);
                 }
             }
         } else {
-            L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+            L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
+                   receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
         }
     } else {
         L_WARN("[PSA_ZMQ_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version local: %i.%i.x",
                msgFqn,
                pubsub_serializerHandler_getSerializationType(receiver->serializerHandler),
-               (int)message->header.msgMajorVersion,
-               (int)message->header.msgMinorVersion,
+               (int) message->header.msgMajorVersion,
+               (int) message->header.msgMinorVersion,
                pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, message->header.msgId),
                pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, message->header.msgId));
     }
 }
 
-static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
-    celixThreadMutex_lock(&receiver->subscribers.mutex);
-    hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
-        if (entry != NULL) {
-            processMsgForSubscriberEntry(receiver, entry, message, receiveTime);
-        }
-    }
-    celixThreadMutex_unlock(&receiver->subscribers.mutex);
-}
-
 static void* psa_zmq_recvThread(void * data) {
     pubsub_zmq_topic_receiver_t *receiver = data;
 
@@ -627,20 +615,16 @@ static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiv
         while (hashMapIterator_hasNext(&iter)) {
             psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (!entry->initialized) {
-                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
-                while (hashMapIterator_hasNext(&iter2)) {
-                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
-                    int rc = 0;
-                    if (svc != NULL && svc->init != NULL) {
-                        rc = svc->init(svc->handle);
-                    }
-                    if (rc == 0) {
-                        //note now only initialized on first subscriber entries added.
-                        entry->initialized = true;
-                    } else {
-                        L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
-                        allInitialized = false;
-                    }
+                int rc = 0;
+                if (entry->subscriberSvc != NULL && entry->subscriberSvc->init != NULL) {
+                    rc = entry->subscriberSvc->init(entry->subscriberSvc->handle);
+                }
+                if (rc == 0) {
+                    //note now only initialized on first subscriber entries added.
+                    entry->initialized = true;
+                } else {
+                    L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+                    allInitialized = false;
                 }
             }
         }
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
index 801fd35..6595853 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
@@ -37,7 +37,7 @@ celix_status_t pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *
 
 bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t **metadata);
 void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata);
-bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata);
+bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t **metadata);
 void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata);
 
 #ifdef __cplusplus
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
index 331a41d..6118fbe 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
@@ -18,7 +18,8 @@
  */
 #include "celix_bundle_context.h"
 #include "celix_constants.h"
-#include "utils.h"
+#include "celix_array_list.h"
+#include "celix_utils.h"
 
 #include "pubsub_interceptors_handler.h"
 
@@ -91,8 +92,8 @@ void pubsubInterceptorsHandler_addInterceptor(void *handle, void *svc, const cel
     celixThreadMutex_lock(&handler->lock);
 
     bool exists = false;
-    for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) {
-        entry_t *entry = arrayList_get(handler->interceptors, i);
+    for (uint32_t i = 0; i < celix_arrayList_size(handler->interceptors); i++) {
+        entry_t *entry = celix_arrayList_get(handler->interceptors, i);
         if (entry->interceptor == svc) {
             exists = true;
         }
@@ -114,11 +115,11 @@ void pubsubInterceptorsHandler_removeInterceptor(void *handle, void *svc, __attr
 
     celixThreadMutex_lock(&handler->lock);
 
-    for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) {
-        entry_t *entry = arrayList_get(handler->interceptors, i);
+    for (uint32_t i = 0; i < celix_arrayList_size(handler->interceptors); i++) {
+        entry_t *entry = celix_arrayList_get(handler->interceptors, i);
         if (entry->interceptor == svc) {
-            void *old = arrayList_remove(handler->interceptors, i);
-            free(old);
+            celix_arrayList_removeAt(handler->interceptors, i);
+            free(entry);
             break;
         }
     }
@@ -131,12 +132,12 @@ bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handl
 
     celixThreadMutex_lock(&handler->lock);
 
-    if (*metadata == NULL && arrayList_size(handler->interceptors) > 0) {
+    if (*metadata == NULL && celix_arrayList_size(handler->interceptors) > 0) {
         *metadata = celix_properties_create();
     }
 
-    for (uint32_t i = arrayList_size(handler->interceptors); i > 0; i--) {
-        entry_t *entry = arrayList_get(handler->interceptors, i - 1);
+    for (uint32_t i = celix_arrayList_size(handler->interceptors); i > 0; i--) {
+        entry_t *entry = celix_arrayList_get(handler->interceptors, i - 1);
         if (entry->interceptor->preSend != NULL) {
             cont = entry->interceptor->preSend(entry->interceptor->handle, &handler->properties, messageType, messageId, message, *metadata);
         }
@@ -153,8 +154,8 @@ bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handl
 void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata) {
     celixThreadMutex_lock(&handler->lock);
 
-    for (uint32_t i = arrayList_size(handler->interceptors); i > 0; i--) {
-        entry_t *entry = arrayList_get(handler->interceptors, i - 1);
+    for (uint32_t i = celix_arrayList_size(handler->interceptors); i > 0; i--) {
+        entry_t *entry = celix_arrayList_get(handler->interceptors, i - 1);
         if (entry->interceptor->postSend != NULL) {
             entry->interceptor->postSend(entry->interceptor->handle, &handler->properties, messageType, messageId, message, metadata);
         }
@@ -163,15 +164,17 @@ void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *hand
     celixThreadMutex_unlock(&handler->lock);
 }
 
-bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata) {
+bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t **metadata) {
     bool cont = true;
 
     celixThreadMutex_lock(&handler->lock);
-
-    for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) {
-        entry_t *entry = arrayList_get(handler->interceptors, i);
+    if (*metadata == NULL && celix_arrayList_size(handler->interceptors) > 0) {
+        *metadata = celix_properties_create();
+    }
+    for (uint32_t i = 0; i < celix_arrayList_size(handler->interceptors); i++) {
+        entry_t *entry = celix_arrayList_get(handler->interceptors, i);
         if (entry->interceptor->preReceive != NULL) {
-            cont = entry->interceptor->preReceive(entry->interceptor->handle, &handler->properties, messageType, messageId, message, metadata);
+            cont = entry->interceptor->preReceive(entry->interceptor->handle, &handler->properties, messageType, messageId, message, *metadata);
         }
         if (!cont) {
             break;
@@ -186,8 +189,8 @@ bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *ha
 void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata) {
     celixThreadMutex_lock(&handler->lock);
 
-    for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) {
-        entry_t *entry = arrayList_get(handler->interceptors, i);
+    for (uint32_t i = 0; i < celix_arrayList_size(handler->interceptors); i++) {
+        entry_t *entry = celix_arrayList_get(handler->interceptors, i);
         if (entry->interceptor->postReceive != NULL) {
             entry->interceptor->postReceive(entry->interceptor->handle, &handler->properties, messageType, messageId, message, metadata);
         }

[celix] 04/07: Adds missing ifdef c++ checks and update interceptor handle to not create a empty metadata if not present on the wire.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 39979064bc42cedac9f533b860403ae4eb532dcc
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Sun Jun 27 16:39:42 2021 +0200

    Adds missing ifdef c++ checks and update interceptor handle to not create a empty metadata if not present on the wire.
---
 bundles/pubsub/integration/CMakeLists.txt          | 28 ++++++++-
 bundles/pubsub/integration/src/sut_activator.c     | 11 +++-
 .../pubsub/pubsub_api/include/pubsub/publisher.h   | 14 ++---
 .../pubsub/pubsub_api/include/pubsub/subscriber.h  | 19 +++---
 bundles/pubsub/pubsub_spi/include/pubsub_admin.h   | 14 ++---
 .../pubsub_spi/include/pubsub_admin_metrics.h      |  7 +++
 .../pubsub/pubsub_spi/include/pubsub_interceptor.h | 67 ++++++++++++++++++++++
 .../include/pubsub_interceptors_handler.h          | 15 +++--
 .../pubsub/pubsub_spi/include/pubsub_listeners.h   |  9 ++-
 .../include/pubsub_message_serialization_marker.h  |  7 +++
 .../include/pubsub_message_serialization_service.h |  7 +++
 .../pubsub/pubsub_spi/include/pubsub_protocol.h    |  7 +++
 .../pubsub/pubsub_spi/include/pubsub_serializer.h  |  7 +++
 .../pubsub_spi/src/pubsub_interceptors_handler.c   | 32 +++++------
 14 files changed, 197 insertions(+), 47 deletions(-)

diff --git a/bundles/pubsub/integration/CMakeLists.txt b/bundles/pubsub/integration/CMakeLists.txt
index 3cda520..a134b74 100644
--- a/bundles/pubsub/integration/CMakeLists.txt
+++ b/bundles/pubsub/integration/CMakeLists.txt
@@ -754,7 +754,6 @@ if (BUILD_PUBSUB_PSA_ZMQ)
     setup_target_for_coverage(pstm_deadlock_zmq_v2_test SCAN_DIR ..)
 endif ()
 
-
 if (BUILD_PUBSUB_PSA_ZMQ)
     #Test suite to test if component with same topic and different scope combinations work
     add_executable(test_pubsub_topic_and_scope_integration
@@ -780,4 +779,31 @@ if (BUILD_PUBSUB_PSA_ZMQ)
             PUBSUB_ZMQ_BUNDLE_FILE="${PUBSUB_ZMQ_BUNDLE_FILE}"
             PUBSUB_WIRE_BUNDLE_FILE="${PUBSUB_WIRE_BUNDLE_FILE}"
     )
+endif ()
+
+if (BUILD_PUBSUB_PSA_ZMQ)
+    #Test suite to test if pusbub interceptors
+    add_executable(test_pubsub_interceptors_integration
+            gtest/PubSubInterceptorTestSuite.cc
+        )
+    target_link_libraries(test_pubsub_interceptors_integration PRIVATE Celix::framework Celix::pubsub_api GTest::gtest GTest::gtest_main Celix::pubsub_spi)
+    target_include_directories(test_pubsub_interceptors_integration PRIVATE src)
+    setup_target_for_coverage(test_pubsub_interceptors_integration SCAN_DIR ..)
+
+    #configure topology manager and pubsub zmq, json serializer and wire protocol v2 bundles
+    celix_get_bundle_file(Celix::pubsub_serializer_json PUBSUB_JSON_BUNDLE_FILE)
+    celix_get_bundle_file(Celix::pubsub_topology_manager PUBSUB_TOPMAN_BUNDLE_FILE)
+    celix_get_bundle_file(Celix::pubsub_admin_zmq PUBSUB_ZMQ_BUNDLE_FILE)
+    celix_get_bundle_file(Celix::pubsub_protocol_wire_v1 PUBSUB_WIRE_BUNDLE_FILE)
+    celix_get_bundle_file(pubsub_sut PUBSUB_PUBLISHER_BUNDLE_FILE)
+    celix_get_bundle_file(pubsub_tst PUBSUB_SUBSCRIBER_BUNDLE_FILE)
+    add_celix_bundle_dependencies(test_pubsub_interceptors_integration Celix::pubsub_serializer_json Celix::pubsub_topology_manager Celix::pubsub_admin_zmq Celix::pubsub_protocol_wire_v1 pubsub_sut pubsub_tst)
+    target_compile_definitions(test_pubsub_interceptors_integration PRIVATE
+            PUBSUB_JSON_BUNDLE_FILE="${PUBSUB_JSON_BUNDLE_FILE}"
+            PUBSUB_TOPMAN_BUNDLE_FILE="${PUBSUB_TOPMAN_BUNDLE_FILE}"
+            PUBSUB_ZMQ_BUNDLE_FILE="${PUBSUB_ZMQ_BUNDLE_FILE}"
+            PUBSUB_WIRE_BUNDLE_FILE="${PUBSUB_WIRE_BUNDLE_FILE}"
+            PUBSUB_PUBLISHER_BUNDLE_FILE="${PUBSUB_PUBLISHER_BUNDLE_FILE}"
+            PUBSUB_SUBSCRIBER_BUNDLE_FILE="${PUBSUB_SUBSCRIBER_BUNDLE_FILE}"
+    )
 endif ()
\ No newline at end of file
diff --git a/bundles/pubsub/integration/src/sut_activator.c b/bundles/pubsub/integration/src/sut_activator.c
index 019ef7c..5a829e7 100644
--- a/bundles/pubsub/integration/src/sut_activator.c
+++ b/bundles/pubsub/integration/src/sut_activator.c
@@ -30,6 +30,8 @@ static void sut_pubSet(void *handle, void *service);
 static void* sut_sendThread(void *data);
 
 struct activator {
+    bool addMetadata;
+
     long pubTrkId;
 
     pthread_t sendThread;
@@ -41,6 +43,8 @@ struct activator {
 
 celix_status_t bnd_start(struct activator *act, celix_bundle_context_t *ctx) {
 
+    act->addMetadata = celix_bundleContext_getPropertyAsBool(ctx, "CELIX_PUBSUB_TEST_ADD_METADATA", false);
+
     char filter[512];
     bool useNegativeScopeFilter = celix_bundleContext_getPropertyAsBool(ctx, "CELIX_PUBSUB_TEST_USE_NEGATIVE_SCOPE_FILTER", true);
     if (useNegativeScopeFilter) {
@@ -91,7 +95,12 @@ static void* sut_sendThread(void *data) {
             if (msgId == 0) {
                 act->pubSvc->localMsgTypeIdForMsgType(act->pubSvc->handle, MSG_NAME, &msgId);
             }
-            act->pubSvc->send(act->pubSvc->handle, msgId, &msg, NULL);
+            celix_properties_t* metadata = NULL;
+            if (act->addMetadata) {
+                metadata = celix_properties_create();
+                celix_properties_setLong(metadata, "seqNr", (long)msgId);
+            }
+            act->pubSvc->send(act->pubSvc->handle, msgId, &msg, metadata);
             if (msg.seqNr % 1000 == 0) {
                 printf("Send %i messages\n", msg.seqNr);
             }
diff --git a/bundles/pubsub/pubsub_api/include/pubsub/publisher.h b/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
index 72cafbe..68eb839 100644
--- a/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
+++ b/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
@@ -16,17 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/**
- * publisher.h
- *
- *  \date       Jan 7, 2016
- *  \author     <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright  Apache License, Version 2.0
- */
 
 #ifndef __PUBSUB_PUBLISHER_H_
 #define __PUBSUB_PUBLISHER_H_
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 #include <stdlib.h>
 
 #include "celix_properties.h"
@@ -75,4 +72,7 @@ struct pubsub_publisher {
 };
 typedef struct pubsub_publisher pubsub_publisher_t;
 
+#ifdef __cplusplus
+}
+#endif
 #endif // __PUBSUB_PUBLISHER_H_
diff --git a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
index 1170f2c..ccc62e4 100644
--- a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
+++ b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
@@ -16,17 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/**
- * subscriber.h
- *
- *  \date       Jan 7, 2016
- *  \author     <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright  Apache License, Version 2.0
- */
 
 #ifndef __PUBSUB_SUBSCRIBER_H_
 #define __PUBSUB_SUBSCRIBER_H_
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 #include <stdbool.h>
 
 #include "celix_properties.h"
@@ -45,6 +42,8 @@ struct pubsub_subscriber_struct {
     /**
      * Called to initialize the subscriber with the receiver thread.
      * Can be used to tweak the receiver thread attributes
+     *
+     * this method can be  NULL.
      */
     int (*init)(void *handle);
      
@@ -66,7 +65,7 @@ struct pubsub_subscriber_struct {
       * @param msgType      The fully qualified type name
       * @param msgTypeId    The local type id of the type, how this is calculated/created is up to the pubsub admin. (can be cached for improved performance)
       * @param msg          The pointer to the message
-      * @param metadata     The meta data provided with the data. Can be NULL.
+      * @param metadata     The meta data provided with the data. Can be NULL and is only valid during the callback.
       * @param release      Pointer to the release boolean, default is release is true.
       * @return Return 0 implies a successful handling. If return is not 0, the msg will always be released by the pubsubadmin.
       */
@@ -75,5 +74,7 @@ struct pubsub_subscriber_struct {
 };
 typedef struct pubsub_subscriber_struct pubsub_subscriber_t;
 
-
+#ifdef __cplusplus
+}
+#endif
 #endif //  __PUBSUB_SUBSCRIBER_H_
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
index 15d5d16..e142883 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
@@ -16,17 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/**
- * pubsub_admin.h
- *
- *  \date       Sep 30, 2011
- *  \author     <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright  Apache License, Version 2.0
- */
 
 #ifndef PUBSUB_ADMIN_H_
 #define PUBSUB_ADMIN_H_
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 #include "celix_properties.h"
 #include "celix_bundle.h"
 #include "celix_filter.h"
@@ -62,6 +59,9 @@ struct pubsub_admin_service {
 
 typedef struct pubsub_admin_service pubsub_admin_service_t;
 
+#ifdef __cplusplus
+}
+#endif
 #endif /* PUBSUB_ADMIN_H_ */
 
 
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_admin_metrics.h b/bundles/pubsub/pubsub_spi/include/pubsub_admin_metrics.h
index 449ec63..628eda0 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_admin_metrics.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_admin_metrics.h
@@ -20,6 +20,10 @@
 #ifndef PUBSUB_ADMIN_METRICS_H_
 #define PUBSUB_ADMIN_METRICS_H_
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 #include <uuid/uuid.h>
 #include <sys/time.h>
 #include "celix_array_list.h"
@@ -101,6 +105,9 @@ void pubsub_freePubSubAdminMetrics(pubsub_admin_metrics_t *metrics);
 
 typedef struct pubsub_admin_metrics_service pubsub_admin_metrics_service_t;
 
+#ifdef __cplusplus
+}
+#endif
 #endif /* PUBSUB_ADMIN_METRICS_H_ */
 
 
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h
index 75bd3d6..eff1e78 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h
@@ -20,6 +20,10 @@
 #ifndef __PUBSUB_INTERCEPTOR_H
 #define __PUBSUB_INTERCEPTOR_H
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 #include <stdlib.h>
 #include <stdint.h>
 
@@ -33,15 +37,78 @@ typedef struct pubsub_interceptor_properties {
     const char *topic;
 } pubsub_interceptor_properties_t;
 
+/**
+ * @brief PubSub Interceptor which can be used to intercept pubsub publish/receive callbacks
+ *
+ */
 struct pubsub_interceptor {
+    /**
+     * Service handle.
+     */
     void *handle;
 
+    /**
+     * @brief preSend will be called when a user called send on a pubsub/publisher, but before the message is "handed over" to the actual pubsub technology (i.e. TCP stack,  shared memory, etc)
+     *
+     * This function can be NULL.
+     *
+     * @param handle The service handle
+     * @param properties The scope and topic of the sending publisher
+     * @param messageType The fqn of the message
+     * @param msgTypeId The (local) type id of the message
+     * @param message The actual message pointer
+     * @param metadata The metadata of the message
+     * @return True if the send should continue.
+     */
     bool (*preSend)(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+
+    /**
+     * @brief postSend will be called when a user called send on a pubsub/publisher, but after the message is "handed over" to the actual pubsub technology (i.e. TCP stack,  shared memory, etc)
+     *
+     * This function can be NULL.
+     *
+     * @param handle The service handle
+     * @param properties The scope and topic of the sending publisher
+     * @param messageType The fqn of the message
+     * @param msgTypeId The (local) type id of the message
+     * @param message The actual message pointer
+     * @param metadata The metadata of the message
+     */
     void (*postSend)(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+
+    /**
+     * @brief preReceive will be called when is message is received in a pubsub admin, but before the pubsub/subscriber callback is called.
+     *
+     * This function can be NULL.
+     *
+     * @param handle The service handle
+     * @param properties The scope and topic of the sending publisher
+     * @param messageType The fqn of the message
+     * @param msgTypeId The (local) type id of the message
+     * @param message The actual message pointer
+     * @param metadata The metadata of the message
+     * @return True if the pubsub/subsciber callback should be called.
+     */
     bool (*preReceive)(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+
+    /**
+     * @brief postReceive will be called when is message is received in a pubsub admin and is called after the pubsub/subscriber callback is called.
+     *
+     * This function can be NULL.
+     *
+     * @param handle The service handle
+     * @param properties The scope and topic of the sending publisher
+     * @param messageType The fqn of the message
+     * @param msgTypeId The (local) type id of the message
+     * @param message The actual message pointer
+     * @param metadata The metadata of the message
+     */
     void (*postReceive)(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
 };
 
 typedef struct pubsub_interceptor pubsub_interceptor_t;
 
+#ifdef __cplusplus
+}
+#endif
 #endif //__PUBSUB_INTERCEPTOR_H
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
index 2334fb8..801fd35 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
@@ -19,6 +19,10 @@
 #ifndef PUBSUB_INTERCEPTORS_HANDLER_H
 #define PUBSUB_INTERCEPTORS_HANDLER_H
 
+#ifdef __cplusplus
+}
+#endif
+
 #include <stdint.h>
 
 #include "celix_errno.h"
@@ -31,9 +35,12 @@ typedef struct pubsub_interceptors_handler pubsub_interceptors_handler_t;
 celix_status_t pubsubInterceptorsHandler_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, pubsub_interceptors_handler_t **handler);
 celix_status_t pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler);
 
-bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t **metadata);
-void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata);
-bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t **metadata);
-void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata);
+bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t **metadata);
+void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata);
+bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata);
+void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata);
 
+#ifdef __cplusplus
+}
+#endif
 #endif //PUBSUB_INTERCEPTORS_HANDLER_H
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h b/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h
index 0982a0f..67d149f 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h
@@ -20,8 +20,11 @@
 #ifndef PUBSUB_LISTENERS_H_
 #define PUBSUB_LISTENERS_H_
 
-#include "celix_properties.h"
+#ifdef __cplusplus
+extern "C" {
+#endif
 
+#include "celix_properties.h"
 
 #define PUBSUB_DISCOVERED_ENDPOINT_LISTENER_SERVICE "pubsub_discovered_endpoint_listener"
 
@@ -48,5 +51,7 @@ struct pubsub_announce_endpoint_listener {
 
 typedef struct pubsub_announce_endpoint_listener pubsub_announce_endpoint_listener_t;
 
-
+#ifdef __cplusplus
+}
+#endif
 #endif /* PUBSUB_LISTENERS_H_ */
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_marker.h b/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_marker.h
index a4ff06b..919b8f4 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_marker.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_marker.h
@@ -20,6 +20,10 @@
 #ifndef PUBSUB_MESSAGE_SERIALIZATION_MARKER_H_
 #define PUBSUB_MESSAGE_SERIALIZATION_MARKER_H_
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 #include "hash_map.h"
 #include "version.h"
 #include "celix_bundle.h"
@@ -57,4 +61,7 @@ typedef struct pubsub_message_serialization_marker {
     void* handle;
 } pubsub_message_serialization_marker_t;
 
+#ifdef __cplusplus
+}
+#endif
 #endif /* PUBSUB_MESSAGE_SERIALIZATION_MARKER_H_ */
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_service.h b/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_service.h
index 09df5a5..c47d9eb 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_service.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_service.h
@@ -20,6 +20,10 @@
 #ifndef PUBSUB_MESSAGE_SERIALIZATION_SERVICE_H_
 #define PUBSUB_MESSAGE_SERIALIZATION_SERVICE_H_
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 #include "hash_map.h"
 #include "version.h"
 #include "celix_bundle.h"
@@ -92,4 +96,7 @@ typedef struct pubsub_message_serialization_service {
 
 } pubsub_message_serialization_service_t;
 
+#ifdef __cplusplus
+}
+#endif
 #endif /* PUBSUB_MESSAGE_SERIALIZATION_SERVICE_H_ */
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h b/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h
index ad1a387..5fcf205 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h
@@ -20,6 +20,10 @@
 #ifndef PUBSUB_PROTOCOL_SERVICE_H_
 #define PUBSUB_PROTOCOL_SERVICE_H_
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 #include <stdint.h>
 #include "celix_properties.h"
 
@@ -234,4 +238,7 @@ typedef struct pubsub_protocol_service {
     celix_status_t (*decodeFooter)(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
 } pubsub_protocol_service_t;
 
+#ifdef __cplusplus
+}
+#endif
 #endif /* PUBSUB_PROTOCOL_SERVICE_H_ */
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h b/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h
index ed7208b..6e251b2 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h
@@ -20,6 +20,10 @@
 #ifndef PUBSUB_SERIALIZER_SERVICE_H_
 #define PUBSUB_SERIALIZER_SERVICE_H_
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 #include "hash_map.h"
 #include "version.h"
 #include "celix_bundle.h"
@@ -61,4 +65,7 @@ typedef struct pubsub_serializer_service {
 
 } pubsub_serializer_service_t;
 
+#ifdef __cplusplus
+}
+#endif
 #endif /* PUBSUB_SERIALIZER_SERVICE_H_ */
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
index 7fd885f..331a41d 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
@@ -126,7 +126,7 @@ void pubsubInterceptorsHandler_removeInterceptor(void *handle, void *svc, __attr
     celixThreadMutex_unlock(&handler->lock);
 }
 
-bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t **metadata) {
+bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t **metadata) {
     bool cont = true;
 
     celixThreadMutex_lock(&handler->lock);
@@ -137,8 +137,9 @@ bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handl
 
     for (uint32_t i = arrayList_size(handler->interceptors); i > 0; i--) {
         entry_t *entry = arrayList_get(handler->interceptors, i - 1);
-
-        cont = entry->interceptor->preSend(entry->interceptor->handle, &handler->properties, messageType, messageId, message, *metadata);
+        if (entry->interceptor->preSend != NULL) {
+            cont = entry->interceptor->preSend(entry->interceptor->handle, &handler->properties, messageType, messageId, message, *metadata);
+        }
         if (!cont) {
             break;
         }
@@ -149,31 +150,29 @@ bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handl
     return cont;
 }
 
-void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata) {
+void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata) {
     celixThreadMutex_lock(&handler->lock);
 
     for (uint32_t i = arrayList_size(handler->interceptors); i > 0; i--) {
         entry_t *entry = arrayList_get(handler->interceptors, i - 1);
-
-        entry->interceptor->postSend(entry->interceptor->handle, &handler->properties, messageType, messageId, message, metadata);
+        if (entry->interceptor->postSend != NULL) {
+            entry->interceptor->postSend(entry->interceptor->handle, &handler->properties, messageType, messageId, message, metadata);
+        }
     }
 
     celixThreadMutex_unlock(&handler->lock);
 }
 
-bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t **metadata) {
+bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata) {
     bool cont = true;
 
     celixThreadMutex_lock(&handler->lock);
 
-    if (*metadata == NULL && arrayList_size(handler->interceptors) > 0) {
-        *metadata = celix_properties_create();
-    }
-
     for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) {
         entry_t *entry = arrayList_get(handler->interceptors, i);
-
-        cont = entry->interceptor->preReceive(entry->interceptor->handle, &handler->properties, messageType, messageId, message, *metadata);
+        if (entry->interceptor->preReceive != NULL) {
+            cont = entry->interceptor->preReceive(entry->interceptor->handle, &handler->properties, messageType, messageId, message, metadata);
+        }
         if (!cont) {
             break;
         }
@@ -184,13 +183,14 @@ bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *ha
     return cont;
 }
 
-void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata) {
+void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata) {
     celixThreadMutex_lock(&handler->lock);
 
     for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) {
         entry_t *entry = arrayList_get(handler->interceptors, i);
-
-        entry->interceptor->postReceive(entry->interceptor->handle, &handler->properties, messageType, messageId, message, metadata);
+        if (entry->interceptor->postReceive != NULL) {
+            entry->interceptor->postReceive(entry->interceptor->handle, &handler->properties, messageType, messageId, message, metadata);
+        }
     }
 
     celixThreadMutex_unlock(&handler->lock);

[celix] 03/07: Removes unused pubsub mock stuff

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git

commit fc7174298cd4b3448e3a4883d93ad9e552a78147
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Fri Jun 25 19:07:54 2021 +0200

    Removes unused pubsub mock stuff
---
 bundles/pubsub/CMakeLists.txt                   |  1 -
 bundles/pubsub/mock/CMakeLists.txt              | 40 -------------
 bundles/pubsub/mock/api/pubsub/publisher_mock.h | 44 --------------
 bundles/pubsub/mock/src/publisher_mock.cc       | 57 -------------------
 bundles/pubsub/mock/tst/pubsubmock_test.cc      | 76 -------------------------
 bundles/pubsub/mock/tst/run_tests.cc            | 25 --------
 6 files changed, 243 deletions(-)

diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index d04b80b..9285195 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -51,7 +51,6 @@ if (PUBSUB)
     add_subdirectory(pubsub_serializer_avrobin)
     add_subdirectory(pubsub_protocol)
     add_subdirectory(keygen)
-    add_subdirectory(mock)
 
     add_subdirectory(examples)
 
diff --git a/bundles/pubsub/mock/CMakeLists.txt b/bundles/pubsub/mock/CMakeLists.txt
deleted file mode 100644
index 59831f9..0000000
--- a/bundles/pubsub/mock/CMakeLists.txt
+++ /dev/null
@@ -1,40 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-#only install if CppuTest is available
-find_package(CppUTest QUIET)
-if (CPPUTEST_FOUND)
-    include_directories(SYSTEM PRIVATE ${CppUTest_INCLUDE_DIR})
-
-    add_library(pubsub_mock STATIC
-            src/publisher_mock.cc
-    )
-    target_link_libraries(pubsub_mock PRIVATE Celix::pubsub_spi ${CppUTest_LIBRARY})
-    target_include_directories(pubsub_mock PUBLIC api)
-    target_include_directories(pubsub_mock SYSTEM PRIVATE ${CppUTest_INCLUDE_DIR})
-
-    if (ENABLE_TESTING)
-        add_executable(pubsubmock_test
-            tst/pubsubmock_test.cc
-            tst/run_tests.cc
-        )
-        target_include_directories(pubsubmock_test PRIVATE ${CMAKE_CURRENT_LIST_DIR}/api)
-        target_link_libraries(pubsubmock_test PRIVATE Celix::pubsub_api pubsub_mock ${CppUTest_LIBRARY} ${CppUTest_EXT_LIBRARIES} Celix::framework)
-        add_test(NAME pubsubmock_test COMMAND pubsubmock_test)
-    endif()
-endif()
-
diff --git a/bundles/pubsub/mock/api/pubsub/publisher_mock.h b/bundles/pubsub/mock/api/pubsub/publisher_mock.h
deleted file mode 100644
index 72c8285..0000000
--- a/bundles/pubsub/mock/api/pubsub/publisher_mock.h
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#ifndef PUBSUB_PUBLISHER_MOCK_H_
-#define PUBSUB_PUBLISHER_MOCK_H_
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-#include "pubsub/publisher.h" 
-
-#define PUBSUB_PUBLISHERMOCK_SCOPE "pubsub_publisher"
-#define PUBSUB_PUBLISHERMOCK_LOCAL_MSG_TYPE_ID_FOR_MSG_TYPE_METHOD "pubsub__publisherMock_localMsgTypeIdForMsgType"
-#define PUBSUB_PUBLISHERMOCK_SEND_METHOD "pubsub__publisherMock_send"
-#define PUBSUB_PUBLISHERMOCK_SEND_MULTIPART_METHOD "pubsub__publisherMock_sendMultipart"
-
-
-/*============================================================================
-  MOCK - initialize publisher mock
-  ============================================================================*/
-void pubsub_publisherMock_init(pubsub_publisher_t* srv, void* handle);
-  
-#ifdef __cplusplus
-}
-#endif
-
-#endif //PUBSUB_PUBLISHER_MOCK_H_
diff --git a/bundles/pubsub/mock/src/publisher_mock.cc b/bundles/pubsub/mock/src/publisher_mock.cc
deleted file mode 100644
index 8d6f7e5..0000000
--- a/bundles/pubsub/mock/src/publisher_mock.cc
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <celix_properties.h>
-#include "pubsub/publisher_mock.h"
-#include "CppUTest/TestHarness.h"
-#include "CppUTestExt/MockSupport.h"
-
-/*============================================================================
-  MOCK - mock function for pubsub_publisher->localMsgTypeIdForMsgType
-  ============================================================================*/
-static int pubsub__publisherMock_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId) {
-    return mock(PUBSUB_PUBLISHERMOCK_SCOPE)
-        .actualCall(PUBSUB_PUBLISHERMOCK_LOCAL_MSG_TYPE_ID_FOR_MSG_TYPE_METHOD)
-        .withPointerParameter("handle", handle)
-        .withParameter("msgType", msgType)
-        .withOutputParameter("msgTypeId", msgTypeId)
-        .returnIntValue();
-}
-
-/*============================================================================
-  MOCK - mock function for pubsub_publisher->send
-  ============================================================================*/
-static int pubsub__publisherMock_send(void *handle, unsigned int msgTypeId, const void *msg, celix_properties_t *metadata) {
-    return mock(PUBSUB_PUBLISHERMOCK_SCOPE)
-        .actualCall(PUBSUB_PUBLISHERMOCK_SEND_METHOD)
-        .withPointerParameter("handle", handle)
-        .withParameter("msgTypeId", msgTypeId)
-        .withPointerParameter("msg", (void*)msg)
-        .withPointerParameter("metadata", (void*)metadata)
-        .returnIntValue();
-}
-
-/*============================================================================
-  MOCK - mock setup for publisher service
-  ============================================================================*/
-void pubsub_publisherMock_init(pubsub_publisher_t* srv, void* handle) {
-    srv->handle = handle;
-    srv->localMsgTypeIdForMsgType = pubsub__publisherMock_localMsgTypeIdForMsgType;
-    srv->send = pubsub__publisherMock_send;
-}
diff --git a/bundles/pubsub/mock/tst/pubsubmock_test.cc b/bundles/pubsub/mock/tst/pubsubmock_test.cc
deleted file mode 100644
index 19f8734..0000000
--- a/bundles/pubsub/mock/tst/pubsubmock_test.cc
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <CppUTestExt/MockSupport.h>
-#include <CppUTest/TestHarness.h>
-
-#include <stdio.h>
-#include <stdint.h>
-#include <stdlib.h>
-#include <string.h>
-#include <ctype.h>
-
-#include "pubsub/publisher_mock.h"
-
-static pubsub_publisher_t mockSrv;
-static void* mockHandle = (void*)0x42;
-
-
-TEST_GROUP(pubsubmock) {
-    void setup(void) {
-        //setup mock
-        pubsub_publisherMock_init(&mockSrv, mockHandle);
-    }
-
-    void teardown() {
-        mock().checkExpectations();
-        mock().clear();
-    }
-};
-
-TEST(pubsubmock, publishermock) {
-    const char* mockFqn = "example.Msg";
-    unsigned int mockOutputTypeId = 11;
-
-    mock(PUBSUB_PUBLISHERMOCK_SCOPE).expectOneCall(PUBSUB_PUBLISHERMOCK_LOCAL_MSG_TYPE_ID_FOR_MSG_TYPE_METHOD)
-        .withParameter("handle", mockHandle)
-    .withParameter("msgType", mockFqn)
-    .withOutputParameterReturning("msgTypeId", &mockOutputTypeId, sizeof(mockOutputTypeId));
-
-    mock(PUBSUB_PUBLISHERMOCK_SCOPE).expectOneCall(PUBSUB_PUBLISHERMOCK_SEND_METHOD)
-        .withParameter("handle", mockHandle)
-        .withParameter("msgTypeId", mockOutputTypeId)
-        .ignoreOtherParameters();
-
-    //This should normally be code which should be tested, for now it code to verify the mock
-    pubsub_publisher_t* srv = &mockSrv;
-    const char* msgFqn = "example.Msg";
-    unsigned int msgId = 0;
-
-    //get type id (normally only if type id is not yet set (e.g. 0))
-    srv->localMsgTypeIdForMsgType(srv->handle, msgFqn, &msgId);
-    CHECK(msgId != 0);
-
-    //set msg
-    void *dummyMsg = (void*)0x43;
-    srv->send(srv->handle, msgId, dummyMsg, NULL); //should satisfy the expectOneCalls
-    //srv->send(srv->handle, msgId, dummyMsg); //enabling this should fail the test
-
-}
-
diff --git a/bundles/pubsub/mock/tst/run_tests.cc b/bundles/pubsub/mock/tst/run_tests.cc
deleted file mode 100644
index 1da4975..0000000
--- a/bundles/pubsub/mock/tst/run_tests.cc
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <CppUTest/TestHarness.h>
-#include "CppUTest/CommandLineTestRunner.h"
-
-int main(int argc, char** argv) {
-    return RUN_ALL_TESTS(argc, argv);
-}

[celix] 07/07: Updates the interceptor api so that metadata can be extended in the preSend/Receive callbacks.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 6229f9e0317ae370b11ed3a99b2b6907763abbd0
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Mon Jun 28 21:42:18 2021 +0200

    Updates the interceptor api so that metadata can be extended in the preSend/Receive callbacks.
---
 .../include/first_interceptor_private.h            |  8 +--
 .../include/second_interceptor_private.h           |  8 +--
 .../pubsub/interceptors/src/first_interceptor.c    |  8 +--
 .../pubsub/interceptors/src/second_interceptor.c   |  8 +--
 .../gtest/PubSubInterceptorTestSuite.cc            | 26 +++++++--
 .../v1/src/pubsub_tcp_topic_receiver.c             |  2 +-
 .../v1/src/pubsub_tcp_topic_sender.c               |  2 +-
 .../v2/src/pubsub_tcp_topic_receiver.c             |  3 +-
 .../v2/src/pubsub_tcp_topic_sender.c               |  3 +-
 .../v1/src/pubsub_zmq_topic_receiver.c             |  2 +-
 .../v1/src/pubsub_zmq_topic_sender.c               |  2 +-
 .../v2/src/pubsub_zmq_topic_receiver.c             |  3 +-
 .../v2/src/pubsub_zmq_topic_sender.c               |  3 +-
 .../pubsub/pubsub_spi/include/pubsub_interceptor.h | 16 ++---
 .../include/pubsub_interceptors_handler.h          | 12 ++--
 .../pubsub_spi/src/pubsub_interceptors_handler.c   | 68 ++++++++++------------
 16 files changed, 94 insertions(+), 80 deletions(-)

diff --git a/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h b/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h
index 941c43b..3071a25 100644
--- a/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h
+++ b/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h
@@ -35,9 +35,9 @@ static const char *const SEQUENCE_NUMBER = "sequence.number";
 celix_status_t firstInterceptor_create(first_interceptor_t **interceptor);
 celix_status_t firstInterceptor_destroy(first_interceptor_t *interceptor);
 
-bool firstInterceptor_preSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
-void firstInterceptor_postSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
-bool firstInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
-void firstInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+bool firstInterceptor_preSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+void firstInterceptor_postSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+bool firstInterceptor_preReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+void firstInterceptor_postReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
 
 #endif //CELIX_FIRST_INTERCEPTOR_PRIVATE_H
diff --git a/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h b/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h
index dc8ebaa..b37abad 100644
--- a/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h
+++ b/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h
@@ -28,9 +28,9 @@ typedef struct second_interceptor {
 celix_status_t secondInterceptor_create(second_interceptor_t **interceptor);
 celix_status_t secondInterceptor_destroy(second_interceptor_t *interceptor);
 
-bool secondInterceptor_preSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
-void secondInterceptor_postSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
-bool secondInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
-void secondInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+bool secondInterceptor_preSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+void secondInterceptor_postSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+bool secondInterceptor_preReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+void secondInterceptor_postReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
 
 #endif //CELIX_SECOND_INTERCEPTOR_PRIVATE_H
diff --git a/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c b/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c
index e15a007..8414d8d 100644
--- a/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c
+++ b/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c
@@ -41,7 +41,7 @@ celix_status_t firstInterceptor_destroy(first_interceptor_t *interceptor) {
 }
 
 
-bool firstInterceptor_preSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
+bool firstInterceptor_preSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
     first_interceptor_t *interceptor = handle;
     celixThreadMutex_lock(&interceptor->mutex);
 
@@ -54,19 +54,19 @@ bool firstInterceptor_preSend(void *handle, pubsub_interceptor_properties_t *pro
     return true;
 }
 
-void firstInterceptor_postSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
+void firstInterceptor_postSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
     uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0);
     printf("Invoked postSend on first interceptor, for message with sequenceNumber [%"PRIu64"]\n", sequence);
 }
 
-bool firstInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
+bool firstInterceptor_preReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
     uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0);
     printf("Invoked preReceive on first interceptor, for message with sequenceNumber [%"PRIu64"]\n", sequence);
 
     return true;
 }
 
-void firstInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
+void firstInterceptor_postReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
     uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0);
     printf("Invoked postReceive on first interceptor, for message with sequenceNumber [%"PRIu64"]\n", sequence);
 }
diff --git a/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c b/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c
index 3e18b9c..d89c553 100644
--- a/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c
+++ b/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c
@@ -36,23 +36,23 @@ celix_status_t secondInterceptor_destroy(second_interceptor_t *interceptor) {
 }
 
 
-bool secondInterceptor_preSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
+bool secondInterceptor_preSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
     printf("Invoked preSend on second interceptor\n");
 
     return true;
 }
 
-void secondInterceptor_postSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
+void secondInterceptor_postSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
     printf("Invoked postSend on second interceptor\n");
 }
 
-bool secondInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
+bool secondInterceptor_preReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
     printf("Invoked preReceive on second interceptor\n");
 
     return true;
 }
 
-void secondInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
+void secondInterceptor_postReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
     printf("Invoked postReceive on second interceptor\n");
 }
 
diff --git a/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc b/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc
index 181eff3..8f500f5 100644
--- a/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc
+++ b/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc
@@ -62,23 +62,35 @@ std::shared_ptr<celix::ServiceRegistration> createInterceptor(std::shared_ptr<ce
         delete inter;
     }};
     interceptor->handle = pubsub_serializerHandler_create(ctx->getCBundleContext(), "json", true);
-    interceptor->postSend = [](void *handle, pubsub_interceptor_properties_t *, const char *msgType, uint32_t msgId, const void *rawMsg,
-                               const celix_properties_t *) {
+    interceptor->preSend  = [](void *, const pubsub_interceptor_properties_t *, const char *, const uint32_t,
+                               const void *, celix_properties_t* metadata) {
+        celix_properties_set(metadata, "test", "preSend");
+        return true;
+    };
+    interceptor->postSend = [](void *handle, const pubsub_interceptor_properties_t* intProps, const char *msgType, uint32_t msgId, const void *rawMsg,
+                               const celix_properties_t* metadata) {
         auto* ser = (pubsub_serializer_handler_t*)handle;
         serializeAndPrint(ser, msgId, rawMsg);
         EXPECT_STREQ(msgType, "msg");
         const auto *msg = static_cast<const msg_t*>(rawMsg);
         EXPECT_GE(msg->seqNr, 0);
-        fprintf(stdout, "Got message in postSend interceptor %p with seq nr %i\n", handle, msg->seqNr);
+        EXPECT_STREQ(celix_properties_get(metadata, "test", nullptr), "preSend");
+        fprintf(stdout, "Got message in postSend interceptor %s/%s for type %s and ser %s with seq nr %i\n", intProps->scope, intProps->topic, intProps->psaType, intProps->serializationType, msg->seqNr);
+    };
+    interceptor->preReceive = [](void *, const pubsub_interceptor_properties_t *, const char *, const uint32_t,
+                                 const void *, celix_properties_t* metadata) {
+        celix_properties_set(metadata, "test", "preReceive");
+        return true;
     };
-    interceptor->postReceive = [](void *handle, pubsub_interceptor_properties_t *, const char *msgType, uint32_t msgId, const void *rawMsg,
-                                  const celix_properties_t *) {
+    interceptor->postReceive = [](void *handle, const pubsub_interceptor_properties_t* intProps, const char *msgType, uint32_t msgId, const void *rawMsg,
+                                  const celix_properties_t* metadata) {
         auto* ser = (pubsub_serializer_handler_t*)handle;
         serializeAndPrint(ser, msgId, rawMsg);
         EXPECT_STREQ(msgType, "msg");
         const auto *msg = static_cast<const msg_t*>(rawMsg);
         EXPECT_GE(msg->seqNr, 0);
-        fprintf(stdout, "Got message in postReceive interceptor %p with seq nr %i\n", handle, msg->seqNr);
+        EXPECT_STREQ(celix_properties_get(metadata, "test", nullptr), "preReceive");
+        fprintf(stdout, "Got message in postReceive interceptor %s/%s for type %s and ser %s with seq nr %i\n", intProps->scope, intProps->topic, intProps->psaType, intProps-> serializationType, msg->seqNr);
     };
     //note registering identical services to validate multiple interceptors
     return ctx->registerService<pubsub_interceptor>(interceptor, PUBSUB_INTERCEPTOR_SERVICE_NAME).build();
@@ -96,5 +108,7 @@ TEST_F(PubSubInterceptorTestSuite, InterceptorWithSinglePublishersAndMultipleRec
     auto reg2 = createInterceptor(ctx);
     auto reg3 = createInterceptor(ctx);
 
+    //TODO stop after a certain amount of messages send
+    //TODO also test with tcp v2.
     sleep(5);
 }
diff --git a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c
index 8acb8e2..4178b50 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c
@@ -144,7 +144,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
     receiver->protocol = protocol;
     receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
     receiver->topic = strndup(topic, 1024 * 1024);
-    pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler);
+    receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_TCP_ADMIN_TYPE, "*unknown*");
     const char *staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_CONNECT_URLS_FOR, topic, scope);
     const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope);
     const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
diff --git a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c
index b287ebd..32a3328 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c
@@ -145,7 +145,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     if (uuid != NULL) {
         uuid_parse(uuid, sender->fwUUID);
     }
-    pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+    sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_TCP_ADMIN_TYPE, "*unknown*");
     sender->isPassive = false;
     sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED);
     char *urls = NULL;
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
index ad321e8..36816f1 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
@@ -124,7 +124,8 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
     receiver->protocol = protocol;
     receiver->scope = celix_utils_strdup(scope);
     receiver->topic = celix_utils_strdup(topic);
-    pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler);
+    receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_TCP_ADMIN_TYPE,
+                                                                     pubsub_serializerHandler_getSerializationType(serializerHandler));
     const char *staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_CONNECT_URLS_FOR, topic, scope);
     const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope);
     const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
index 2c8daf4..3c58e84 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
@@ -122,7 +122,8 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     if (uuid != NULL) {
         uuid_parse(uuid, sender->fwUUID);
     }
-    pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+    sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_TCP_ADMIN_TYPE,
+                                                                   pubsub_serializerHandler_getSerializationType(serializerHandler));
     sender->isPassive = false;
     char *urls = NULL;
     const char *ip = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_IP_KEY, NULL);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c
index 9cd99f4..62b2fbf 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c
@@ -157,7 +157,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
     receiver->topic = strndup(topic, 1024 * 1024);
     receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
 
-    pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler);
+    receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_ZMQ_ADMIN_TYPE, "*unknown*");
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
     char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c
index 1e95c0b..aba8893 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c
@@ -157,7 +157,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
     sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
     sender->zeroCopyEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_ZEROCOPY_ENABLED, PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED);
 
-    pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+    sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_ZMQ_ADMIN_TYPE, "*unknown*");
 
     //setting up zmq socket for ZMQ TopicSender
     {
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
index 0fc2bc2..d6c5805 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
@@ -138,7 +138,8 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
     receiver->scope = scope == NULL ? NULL : celix_utils_strdup(scope);
     receiver->topic = celix_utils_strdup(topic);
 
-    pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler);
+    receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_ZMQ_ADMIN_TYPE,
+                                                                     pubsub_serializerHandler_getSerializationType(serHandler));
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
     char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
index 7d9e750..d1f36a5 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
@@ -142,7 +142,8 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
     }
     sender->zeroCopyEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_ZEROCOPY_ENABLED, PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED);
 
-    pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+    sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_ZMQ_ADMIN_TYPE,
+                                                                   pubsub_serializerHandler_getSerializationType(serializerHandler));
 
     //setting up zmq socket for ZMQ TopicSender
     {
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h
index eff1e78..69bffa9 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h
@@ -30,11 +30,13 @@ extern "C" {
 #include "celix_properties.h"
 
 #define PUBSUB_INTERCEPTOR_SERVICE_NAME     "pubsub.interceptor"
-#define PUBSUB_INTERCEPTOR_SERVICE_VERSION  "1.0.0"
+#define PUBSUB_INTERCEPTOR_SERVICE_VERSION  "2.0.0"
 
 typedef struct pubsub_interceptor_properties {
-    const char *scope;
-    const char *topic;
+    const char* psaType; //i.e. zmq, tcp, etc
+    const char* serializationType; //i.e. json, avrobin
+    const char* scope;
+    const char* topic;
 } pubsub_interceptor_properties_t;
 
 /**
@@ -60,7 +62,7 @@ struct pubsub_interceptor {
      * @param metadata The metadata of the message
      * @return True if the send should continue.
      */
-    bool (*preSend)(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+    bool (*preSend)(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
 
     /**
      * @brief postSend will be called when a user called send on a pubsub/publisher, but after the message is "handed over" to the actual pubsub technology (i.e. TCP stack,  shared memory, etc)
@@ -74,7 +76,7 @@ struct pubsub_interceptor {
      * @param message The actual message pointer
      * @param metadata The metadata of the message
      */
-    void (*postSend)(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+    void (*postSend)(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
 
     /**
      * @brief preReceive will be called when is message is received in a pubsub admin, but before the pubsub/subscriber callback is called.
@@ -89,7 +91,7 @@ struct pubsub_interceptor {
      * @param metadata The metadata of the message
      * @return True if the pubsub/subsciber callback should be called.
      */
-    bool (*preReceive)(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+    bool (*preReceive)(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
 
     /**
      * @brief postReceive will be called when is message is received in a pubsub admin and is called after the pubsub/subscriber callback is called.
@@ -103,7 +105,7 @@ struct pubsub_interceptor {
      * @param message The actual message pointer
      * @param metadata The metadata of the message
      */
-    void (*postReceive)(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+    void (*postReceive)(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
 };
 
 typedef struct pubsub_interceptor pubsub_interceptor_t;
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
index 6595853..a12a865 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
@@ -32,13 +32,13 @@
 
 typedef struct pubsub_interceptors_handler pubsub_interceptors_handler_t;
 
-celix_status_t pubsubInterceptorsHandler_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, pubsub_interceptors_handler_t **handler);
-celix_status_t pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler);
+pubsub_interceptors_handler_t* pubsubInterceptorsHandler_create(celix_bundle_context_t* ctx, const char* scope, const char* topic,  const char* psaType, const char* serializationType);
+void pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler);
 
-bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t **metadata);
-void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata);
-bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t **metadata);
-void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata);
+bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char* messageType, uint32_t messageId, const void* message, celix_properties_t** metadata);
+void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char* messageType, uint32_t messageId, const void* message, celix_properties_t* metadata);
+bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char* messageType, uint32_t messageId, const void* message, celix_properties_t** metadata);
+void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char* messageType, uint32_t messageId, const void* message, celix_properties_t* metadata);
 
 #ifdef __cplusplus
 }
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
index 6118fbe..8191f94 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
@@ -45,45 +45,39 @@ static int referenceCompare(const void *a, const void *b);
 static void pubsubInterceptorsHandler_addInterceptor(void *handle, void *svc, const celix_properties_t *props);
 static void pubsubInterceptorsHandler_removeInterceptor(void *handle, void *svc, const celix_properties_t *props);
 
-celix_status_t pubsubInterceptorsHandler_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, pubsub_interceptors_handler_t **handler) {
-    celix_status_t status = CELIX_SUCCESS;
-
-    *handler = calloc(1, sizeof(**handler));
-    if (!*handler) {
-        status = CELIX_ENOMEM;
-    } else {
-        (*handler)->ctx = ctx;
-
-        (*handler)->properties.scope = scope;
-        (*handler)->properties.topic = topic;
-
-        (*handler)->interceptors = celix_arrayList_create();
-
-        status = celixThreadMutex_create(&(*handler)->lock, NULL);
-
-        if (status == CELIX_SUCCESS) {
-            // Create service tracker here, and not in the activator
-            celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
-            opts.filter.serviceName = PUBSUB_INTERCEPTOR_SERVICE_NAME;
-            opts.filter.ignoreServiceLanguage = true;
-            opts.callbackHandle = *handler;
-            opts.addWithProperties = pubsubInterceptorsHandler_addInterceptor;
-            opts.removeWithProperties = pubsubInterceptorsHandler_removeInterceptor;
-            (*handler)->interceptorsTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
-        }
-    }
-
-    return status;
+pubsub_interceptors_handler_t* pubsubInterceptorsHandler_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, const char* psaType, const char* serType) {
+    pubsub_interceptors_handler_t* handler = calloc(1, sizeof(*handler));
+    handler->ctx = ctx;
+    handler->properties.scope = celix_utils_strdup(scope);
+    handler->properties.topic = celix_utils_strdup(topic);
+    handler->properties.psaType = celix_utils_strdup(psaType);
+    handler->properties.serializationType = celix_utils_strdup(serType);
+    handler->interceptors = celix_arrayList_create();
+    celixThreadMutex_create(&handler->lock, NULL);
+
+    // Create service tracker here, and not in the activator
+    celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+    opts.filter.serviceName = PUBSUB_INTERCEPTOR_SERVICE_NAME;
+    opts.filter.ignoreServiceLanguage = true;
+    opts.callbackHandle = handler;
+    opts.addWithProperties = pubsubInterceptorsHandler_addInterceptor;
+    opts.removeWithProperties = pubsubInterceptorsHandler_removeInterceptor;
+    handler->interceptorsTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+
+    return handler;
 }
 
-celix_status_t pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler) {
-    celix_bundleContext_stopTracker(handler->ctx, handler->interceptorsTrackerId);
-
-    celix_arrayList_destroy(handler->interceptors);
-    celixThreadMutex_destroy(&handler->lock);
-    free(handler);
-
-    return CELIX_SUCCESS;
+void pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler) {
+    if (handler != NULL) {
+        celix_bundleContext_stopTracker(handler->ctx, handler->interceptorsTrackerId);
+        celix_arrayList_destroy(handler->interceptors);
+        celixThreadMutex_destroy(&handler->lock);
+        free((char*)handler->properties.scope);
+        free((char*)handler->properties.topic);
+        free((char*)handler->properties.psaType);
+        free((char*)handler->properties.serializationType);
+        free(handler);
+    }
 }
 
 void pubsubInterceptorsHandler_addInterceptor(void *handle, void *svc, const celix_properties_t *props) {