You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2021/06/28 19:42:58 UTC

[celix] 07/07: Updates the interceptor api so that metadata can be extended in the preSend/Receive callbacks.

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 6229f9e0317ae370b11ed3a99b2b6907763abbd0
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Mon Jun 28 21:42:18 2021 +0200

    Updates the interceptor api so that metadata can be extended in the preSend/Receive callbacks.
---
 .../include/first_interceptor_private.h            |  8 +--
 .../include/second_interceptor_private.h           |  8 +--
 .../pubsub/interceptors/src/first_interceptor.c    |  8 +--
 .../pubsub/interceptors/src/second_interceptor.c   |  8 +--
 .../gtest/PubSubInterceptorTestSuite.cc            | 26 +++++++--
 .../v1/src/pubsub_tcp_topic_receiver.c             |  2 +-
 .../v1/src/pubsub_tcp_topic_sender.c               |  2 +-
 .../v2/src/pubsub_tcp_topic_receiver.c             |  3 +-
 .../v2/src/pubsub_tcp_topic_sender.c               |  3 +-
 .../v1/src/pubsub_zmq_topic_receiver.c             |  2 +-
 .../v1/src/pubsub_zmq_topic_sender.c               |  2 +-
 .../v2/src/pubsub_zmq_topic_receiver.c             |  3 +-
 .../v2/src/pubsub_zmq_topic_sender.c               |  3 +-
 .../pubsub/pubsub_spi/include/pubsub_interceptor.h | 16 ++---
 .../include/pubsub_interceptors_handler.h          | 12 ++--
 .../pubsub_spi/src/pubsub_interceptors_handler.c   | 68 ++++++++++------------
 16 files changed, 94 insertions(+), 80 deletions(-)

diff --git a/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h b/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h
index 941c43b..3071a25 100644
--- a/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h
+++ b/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h
@@ -35,9 +35,9 @@ static const char *const SEQUENCE_NUMBER = "sequence.number";
 celix_status_t firstInterceptor_create(first_interceptor_t **interceptor);
 celix_status_t firstInterceptor_destroy(first_interceptor_t *interceptor);
 
-bool firstInterceptor_preSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
-void firstInterceptor_postSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
-bool firstInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
-void firstInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+bool firstInterceptor_preSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+void firstInterceptor_postSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+bool firstInterceptor_preReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+void firstInterceptor_postReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
 
 #endif //CELIX_FIRST_INTERCEPTOR_PRIVATE_H
diff --git a/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h b/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h
index dc8ebaa..b37abad 100644
--- a/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h
+++ b/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h
@@ -28,9 +28,9 @@ typedef struct second_interceptor {
 celix_status_t secondInterceptor_create(second_interceptor_t **interceptor);
 celix_status_t secondInterceptor_destroy(second_interceptor_t *interceptor);
 
-bool secondInterceptor_preSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
-void secondInterceptor_postSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
-bool secondInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
-void secondInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+bool secondInterceptor_preSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+void secondInterceptor_postSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+bool secondInterceptor_preReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+void secondInterceptor_postReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
 
 #endif //CELIX_SECOND_INTERCEPTOR_PRIVATE_H
diff --git a/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c b/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c
index e15a007..8414d8d 100644
--- a/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c
+++ b/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c
@@ -41,7 +41,7 @@ celix_status_t firstInterceptor_destroy(first_interceptor_t *interceptor) {
 }
 
 
-bool firstInterceptor_preSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
+bool firstInterceptor_preSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
     first_interceptor_t *interceptor = handle;
     celixThreadMutex_lock(&interceptor->mutex);
 
@@ -54,19 +54,19 @@ bool firstInterceptor_preSend(void *handle, pubsub_interceptor_properties_t *pro
     return true;
 }
 
-void firstInterceptor_postSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
+void firstInterceptor_postSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
     uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0);
     printf("Invoked postSend on first interceptor, for message with sequenceNumber [%"PRIu64"]\n", sequence);
 }
 
-bool firstInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
+bool firstInterceptor_preReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
     uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0);
     printf("Invoked preReceive on first interceptor, for message with sequenceNumber [%"PRIu64"]\n", sequence);
 
     return true;
 }
 
-void firstInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
+void firstInterceptor_postReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
     uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0);
     printf("Invoked postReceive on first interceptor, for message with sequenceNumber [%"PRIu64"]\n", sequence);
 }
diff --git a/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c b/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c
index 3e18b9c..d89c553 100644
--- a/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c
+++ b/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c
@@ -36,23 +36,23 @@ celix_status_t secondInterceptor_destroy(second_interceptor_t *interceptor) {
 }
 
 
-bool secondInterceptor_preSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
+bool secondInterceptor_preSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
     printf("Invoked preSend on second interceptor\n");
 
     return true;
 }
 
-void secondInterceptor_postSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
+void secondInterceptor_postSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
     printf("Invoked postSend on second interceptor\n");
 }
 
-bool secondInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
+bool secondInterceptor_preReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
     printf("Invoked preReceive on second interceptor\n");
 
     return true;
 }
 
-void secondInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
+void secondInterceptor_postReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
     printf("Invoked postReceive on second interceptor\n");
 }
 
diff --git a/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc b/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc
index 181eff3..8f500f5 100644
--- a/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc
+++ b/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc
@@ -62,23 +62,35 @@ std::shared_ptr<celix::ServiceRegistration> createInterceptor(std::shared_ptr<ce
         delete inter;
     }};
     interceptor->handle = pubsub_serializerHandler_create(ctx->getCBundleContext(), "json", true);
-    interceptor->postSend = [](void *handle, pubsub_interceptor_properties_t *, const char *msgType, uint32_t msgId, const void *rawMsg,
-                               const celix_properties_t *) {
+    interceptor->preSend  = [](void *, const pubsub_interceptor_properties_t *, const char *, const uint32_t,
+                               const void *, celix_properties_t* metadata) {
+        celix_properties_set(metadata, "test", "preSend");
+        return true;
+    };
+    interceptor->postSend = [](void *handle, const pubsub_interceptor_properties_t* intProps, const char *msgType, uint32_t msgId, const void *rawMsg,
+                               const celix_properties_t* metadata) {
         auto* ser = (pubsub_serializer_handler_t*)handle;
         serializeAndPrint(ser, msgId, rawMsg);
         EXPECT_STREQ(msgType, "msg");
         const auto *msg = static_cast<const msg_t*>(rawMsg);
         EXPECT_GE(msg->seqNr, 0);
-        fprintf(stdout, "Got message in postSend interceptor %p with seq nr %i\n", handle, msg->seqNr);
+        EXPECT_STREQ(celix_properties_get(metadata, "test", nullptr), "preSend");
+        fprintf(stdout, "Got message in postSend interceptor %s/%s for type %s and ser %s with seq nr %i\n", intProps->scope, intProps->topic, intProps->psaType, intProps->serializationType, msg->seqNr);
+    };
+    interceptor->preReceive = [](void *, const pubsub_interceptor_properties_t *, const char *, const uint32_t,
+                                 const void *, celix_properties_t* metadata) {
+        celix_properties_set(metadata, "test", "preReceive");
+        return true;
     };
-    interceptor->postReceive = [](void *handle, pubsub_interceptor_properties_t *, const char *msgType, uint32_t msgId, const void *rawMsg,
-                                  const celix_properties_t *) {
+    interceptor->postReceive = [](void *handle, const pubsub_interceptor_properties_t* intProps, const char *msgType, uint32_t msgId, const void *rawMsg,
+                                  const celix_properties_t* metadata) {
         auto* ser = (pubsub_serializer_handler_t*)handle;
         serializeAndPrint(ser, msgId, rawMsg);
         EXPECT_STREQ(msgType, "msg");
         const auto *msg = static_cast<const msg_t*>(rawMsg);
         EXPECT_GE(msg->seqNr, 0);
-        fprintf(stdout, "Got message in postReceive interceptor %p with seq nr %i\n", handle, msg->seqNr);
+        EXPECT_STREQ(celix_properties_get(metadata, "test", nullptr), "preReceive");
+        fprintf(stdout, "Got message in postReceive interceptor %s/%s for type %s and ser %s with seq nr %i\n", intProps->scope, intProps->topic, intProps->psaType, intProps-> serializationType, msg->seqNr);
     };
     //note registering identical services to validate multiple interceptors
     return ctx->registerService<pubsub_interceptor>(interceptor, PUBSUB_INTERCEPTOR_SERVICE_NAME).build();
@@ -96,5 +108,7 @@ TEST_F(PubSubInterceptorTestSuite, InterceptorWithSinglePublishersAndMultipleRec
     auto reg2 = createInterceptor(ctx);
     auto reg3 = createInterceptor(ctx);
 
+    //TODO stop after a certain amount of messages send
+    //TODO also test with tcp v2.
     sleep(5);
 }
diff --git a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c
index 8acb8e2..4178b50 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c
@@ -144,7 +144,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
     receiver->protocol = protocol;
     receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
     receiver->topic = strndup(topic, 1024 * 1024);
-    pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler);
+    receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_TCP_ADMIN_TYPE, "*unknown*");
     const char *staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_CONNECT_URLS_FOR, topic, scope);
     const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope);
     const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
diff --git a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c
index b287ebd..32a3328 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c
@@ -145,7 +145,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     if (uuid != NULL) {
         uuid_parse(uuid, sender->fwUUID);
     }
-    pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+    sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_TCP_ADMIN_TYPE, "*unknown*");
     sender->isPassive = false;
     sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED);
     char *urls = NULL;
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
index ad321e8..36816f1 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
@@ -124,7 +124,8 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
     receiver->protocol = protocol;
     receiver->scope = celix_utils_strdup(scope);
     receiver->topic = celix_utils_strdup(topic);
-    pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler);
+    receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_TCP_ADMIN_TYPE,
+                                                                     pubsub_serializerHandler_getSerializationType(serializerHandler));
     const char *staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_CONNECT_URLS_FOR, topic, scope);
     const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope);
     const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
index 2c8daf4..3c58e84 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
@@ -122,7 +122,8 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     if (uuid != NULL) {
         uuid_parse(uuid, sender->fwUUID);
     }
-    pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+    sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_TCP_ADMIN_TYPE,
+                                                                   pubsub_serializerHandler_getSerializationType(serializerHandler));
     sender->isPassive = false;
     char *urls = NULL;
     const char *ip = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_IP_KEY, NULL);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c
index 9cd99f4..62b2fbf 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c
@@ -157,7 +157,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
     receiver->topic = strndup(topic, 1024 * 1024);
     receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
 
-    pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler);
+    receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_ZMQ_ADMIN_TYPE, "*unknown*");
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
     char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c
index 1e95c0b..aba8893 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c
@@ -157,7 +157,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
     sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
     sender->zeroCopyEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_ZEROCOPY_ENABLED, PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED);
 
-    pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+    sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_ZMQ_ADMIN_TYPE, "*unknown*");
 
     //setting up zmq socket for ZMQ TopicSender
     {
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
index 0fc2bc2..d6c5805 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
@@ -138,7 +138,8 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
     receiver->scope = scope == NULL ? NULL : celix_utils_strdup(scope);
     receiver->topic = celix_utils_strdup(topic);
 
-    pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler);
+    receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_ZMQ_ADMIN_TYPE,
+                                                                     pubsub_serializerHandler_getSerializationType(serHandler));
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
     char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
index 7d9e750..d1f36a5 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
@@ -142,7 +142,8 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
     }
     sender->zeroCopyEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_ZEROCOPY_ENABLED, PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED);
 
-    pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+    sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_ZMQ_ADMIN_TYPE,
+                                                                   pubsub_serializerHandler_getSerializationType(serializerHandler));
 
     //setting up zmq socket for ZMQ TopicSender
     {
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h
index eff1e78..69bffa9 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h
@@ -30,11 +30,13 @@ extern "C" {
 #include "celix_properties.h"
 
 #define PUBSUB_INTERCEPTOR_SERVICE_NAME     "pubsub.interceptor"
-#define PUBSUB_INTERCEPTOR_SERVICE_VERSION  "1.0.0"
+#define PUBSUB_INTERCEPTOR_SERVICE_VERSION  "2.0.0"
 
 typedef struct pubsub_interceptor_properties {
-    const char *scope;
-    const char *topic;
+    const char* psaType; //i.e. zmq, tcp, etc
+    const char* serializationType; //i.e. json, avrobin
+    const char* scope;
+    const char* topic;
 } pubsub_interceptor_properties_t;
 
 /**
@@ -60,7 +62,7 @@ struct pubsub_interceptor {
      * @param metadata The metadata of the message
      * @return True if the send should continue.
      */
-    bool (*preSend)(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+    bool (*preSend)(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
 
     /**
      * @brief postSend will be called when a user called send on a pubsub/publisher, but after the message is "handed over" to the actual pubsub technology (i.e. TCP stack,  shared memory, etc)
@@ -74,7 +76,7 @@ struct pubsub_interceptor {
      * @param message The actual message pointer
      * @param metadata The metadata of the message
      */
-    void (*postSend)(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+    void (*postSend)(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
 
     /**
      * @brief preReceive will be called when is message is received in a pubsub admin, but before the pubsub/subscriber callback is called.
@@ -89,7 +91,7 @@ struct pubsub_interceptor {
      * @param metadata The metadata of the message
      * @return True if the pubsub/subsciber callback should be called.
      */
-    bool (*preReceive)(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+    bool (*preReceive)(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
 
     /**
      * @brief postReceive will be called when is message is received in a pubsub admin and is called after the pubsub/subscriber callback is called.
@@ -103,7 +105,7 @@ struct pubsub_interceptor {
      * @param message The actual message pointer
      * @param metadata The metadata of the message
      */
-    void (*postReceive)(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+    void (*postReceive)(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
 };
 
 typedef struct pubsub_interceptor pubsub_interceptor_t;
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
index 6595853..a12a865 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
@@ -32,13 +32,13 @@
 
 typedef struct pubsub_interceptors_handler pubsub_interceptors_handler_t;
 
-celix_status_t pubsubInterceptorsHandler_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, pubsub_interceptors_handler_t **handler);
-celix_status_t pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler);
+pubsub_interceptors_handler_t* pubsubInterceptorsHandler_create(celix_bundle_context_t* ctx, const char* scope, const char* topic,  const char* psaType, const char* serializationType);
+void pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler);
 
-bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t **metadata);
-void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata);
-bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t **metadata);
-void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata);
+bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char* messageType, uint32_t messageId, const void* message, celix_properties_t** metadata);
+void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char* messageType, uint32_t messageId, const void* message, celix_properties_t* metadata);
+bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char* messageType, uint32_t messageId, const void* message, celix_properties_t** metadata);
+void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char* messageType, uint32_t messageId, const void* message, celix_properties_t* metadata);
 
 #ifdef __cplusplus
 }
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
index 6118fbe..8191f94 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
@@ -45,45 +45,39 @@ static int referenceCompare(const void *a, const void *b);
 static void pubsubInterceptorsHandler_addInterceptor(void *handle, void *svc, const celix_properties_t *props);
 static void pubsubInterceptorsHandler_removeInterceptor(void *handle, void *svc, const celix_properties_t *props);
 
-celix_status_t pubsubInterceptorsHandler_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, pubsub_interceptors_handler_t **handler) {
-    celix_status_t status = CELIX_SUCCESS;
-
-    *handler = calloc(1, sizeof(**handler));
-    if (!*handler) {
-        status = CELIX_ENOMEM;
-    } else {
-        (*handler)->ctx = ctx;
-
-        (*handler)->properties.scope = scope;
-        (*handler)->properties.topic = topic;
-
-        (*handler)->interceptors = celix_arrayList_create();
-
-        status = celixThreadMutex_create(&(*handler)->lock, NULL);
-
-        if (status == CELIX_SUCCESS) {
-            // Create service tracker here, and not in the activator
-            celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
-            opts.filter.serviceName = PUBSUB_INTERCEPTOR_SERVICE_NAME;
-            opts.filter.ignoreServiceLanguage = true;
-            opts.callbackHandle = *handler;
-            opts.addWithProperties = pubsubInterceptorsHandler_addInterceptor;
-            opts.removeWithProperties = pubsubInterceptorsHandler_removeInterceptor;
-            (*handler)->interceptorsTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
-        }
-    }
-
-    return status;
+pubsub_interceptors_handler_t* pubsubInterceptorsHandler_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, const char* psaType, const char* serType) {
+    pubsub_interceptors_handler_t* handler = calloc(1, sizeof(*handler));
+    handler->ctx = ctx;
+    handler->properties.scope = celix_utils_strdup(scope);
+    handler->properties.topic = celix_utils_strdup(topic);
+    handler->properties.psaType = celix_utils_strdup(psaType);
+    handler->properties.serializationType = celix_utils_strdup(serType);
+    handler->interceptors = celix_arrayList_create();
+    celixThreadMutex_create(&handler->lock, NULL);
+
+    // Create service tracker here, and not in the activator
+    celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+    opts.filter.serviceName = PUBSUB_INTERCEPTOR_SERVICE_NAME;
+    opts.filter.ignoreServiceLanguage = true;
+    opts.callbackHandle = handler;
+    opts.addWithProperties = pubsubInterceptorsHandler_addInterceptor;
+    opts.removeWithProperties = pubsubInterceptorsHandler_removeInterceptor;
+    handler->interceptorsTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+
+    return handler;
 }
 
-celix_status_t pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler) {
-    celix_bundleContext_stopTracker(handler->ctx, handler->interceptorsTrackerId);
-
-    celix_arrayList_destroy(handler->interceptors);
-    celixThreadMutex_destroy(&handler->lock);
-    free(handler);
-
-    return CELIX_SUCCESS;
+void pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler) {
+    if (handler != NULL) {
+        celix_bundleContext_stopTracker(handler->ctx, handler->interceptorsTrackerId);
+        celix_arrayList_destroy(handler->interceptors);
+        celixThreadMutex_destroy(&handler->lock);
+        free((char*)handler->properties.scope);
+        free((char*)handler->properties.topic);
+        free((char*)handler->properties.psaType);
+        free((char*)handler->properties.serializationType);
+        free(handler);
+    }
 }
 
 void pubsubInterceptorsHandler_addInterceptor(void *handle, void *svc, const celix_properties_t *props) {