You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2021/06/28 19:42:58 UTC
[celix] 07/07: Updates the interceptor api so that metadata can be
extended in the preSend/Receive callbacks.
This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git
commit 6229f9e0317ae370b11ed3a99b2b6907763abbd0
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Mon Jun 28 21:42:18 2021 +0200
Updates the interceptor api so that metadata can be extended in the preSend/Receive callbacks.
---
.../include/first_interceptor_private.h | 8 +--
.../include/second_interceptor_private.h | 8 +--
.../pubsub/interceptors/src/first_interceptor.c | 8 +--
.../pubsub/interceptors/src/second_interceptor.c | 8 +--
.../gtest/PubSubInterceptorTestSuite.cc | 26 +++++++--
.../v1/src/pubsub_tcp_topic_receiver.c | 2 +-
.../v1/src/pubsub_tcp_topic_sender.c | 2 +-
.../v2/src/pubsub_tcp_topic_receiver.c | 3 +-
.../v2/src/pubsub_tcp_topic_sender.c | 3 +-
.../v1/src/pubsub_zmq_topic_receiver.c | 2 +-
.../v1/src/pubsub_zmq_topic_sender.c | 2 +-
.../v2/src/pubsub_zmq_topic_receiver.c | 3 +-
.../v2/src/pubsub_zmq_topic_sender.c | 3 +-
.../pubsub/pubsub_spi/include/pubsub_interceptor.h | 16 ++---
.../include/pubsub_interceptors_handler.h | 12 ++--
.../pubsub_spi/src/pubsub_interceptors_handler.c | 68 ++++++++++------------
16 files changed, 94 insertions(+), 80 deletions(-)
diff --git a/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h b/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h
index 941c43b..3071a25 100644
--- a/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h
+++ b/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h
@@ -35,9 +35,9 @@ static const char *const SEQUENCE_NUMBER = "sequence.number";
celix_status_t firstInterceptor_create(first_interceptor_t **interceptor);
celix_status_t firstInterceptor_destroy(first_interceptor_t *interceptor);
-bool firstInterceptor_preSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
-void firstInterceptor_postSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
-bool firstInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
-void firstInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+bool firstInterceptor_preSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+void firstInterceptor_postSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+bool firstInterceptor_preReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+void firstInterceptor_postReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
#endif //CELIX_FIRST_INTERCEPTOR_PRIVATE_H
diff --git a/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h b/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h
index dc8ebaa..b37abad 100644
--- a/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h
+++ b/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h
@@ -28,9 +28,9 @@ typedef struct second_interceptor {
celix_status_t secondInterceptor_create(second_interceptor_t **interceptor);
celix_status_t secondInterceptor_destroy(second_interceptor_t *interceptor);
-bool secondInterceptor_preSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
-void secondInterceptor_postSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
-bool secondInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
-void secondInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+bool secondInterceptor_preSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+void secondInterceptor_postSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+bool secondInterceptor_preReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
+void secondInterceptor_postReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
#endif //CELIX_SECOND_INTERCEPTOR_PRIVATE_H
diff --git a/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c b/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c
index e15a007..8414d8d 100644
--- a/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c
+++ b/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c
@@ -41,7 +41,7 @@ celix_status_t firstInterceptor_destroy(first_interceptor_t *interceptor) {
}
-bool firstInterceptor_preSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
+bool firstInterceptor_preSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
first_interceptor_t *interceptor = handle;
celixThreadMutex_lock(&interceptor->mutex);
@@ -54,19 +54,19 @@ bool firstInterceptor_preSend(void *handle, pubsub_interceptor_properties_t *pro
return true;
}
-void firstInterceptor_postSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
+void firstInterceptor_postSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0);
printf("Invoked postSend on first interceptor, for message with sequenceNumber [%"PRIu64"]\n", sequence);
}
-bool firstInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
+bool firstInterceptor_preReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0);
printf("Invoked preReceive on first interceptor, for message with sequenceNumber [%"PRIu64"]\n", sequence);
return true;
}
-void firstInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
+void firstInterceptor_postReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0);
printf("Invoked postReceive on first interceptor, for message with sequenceNumber [%"PRIu64"]\n", sequence);
}
diff --git a/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c b/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c
index 3e18b9c..d89c553 100644
--- a/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c
+++ b/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c
@@ -36,23 +36,23 @@ celix_status_t secondInterceptor_destroy(second_interceptor_t *interceptor) {
}
-bool secondInterceptor_preSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
+bool secondInterceptor_preSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
printf("Invoked preSend on second interceptor\n");
return true;
}
-void secondInterceptor_postSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
+void secondInterceptor_postSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
printf("Invoked postSend on second interceptor\n");
}
-bool secondInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
+bool secondInterceptor_preReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) {
printf("Invoked preReceive on second interceptor\n");
return true;
}
-void secondInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
+void secondInterceptor_postReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) {
printf("Invoked postReceive on second interceptor\n");
}
diff --git a/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc b/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc
index 181eff3..8f500f5 100644
--- a/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc
+++ b/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc
@@ -62,23 +62,35 @@ std::shared_ptr<celix::ServiceRegistration> createInterceptor(std::shared_ptr<ce
delete inter;
}};
interceptor->handle = pubsub_serializerHandler_create(ctx->getCBundleContext(), "json", true);
- interceptor->postSend = [](void *handle, pubsub_interceptor_properties_t *, const char *msgType, uint32_t msgId, const void *rawMsg,
- const celix_properties_t *) {
+ interceptor->preSend = [](void *, const pubsub_interceptor_properties_t *, const char *, const uint32_t,
+ const void *, celix_properties_t* metadata) {
+ celix_properties_set(metadata, "test", "preSend");
+ return true;
+ };
+ interceptor->postSend = [](void *handle, const pubsub_interceptor_properties_t* intProps, const char *msgType, uint32_t msgId, const void *rawMsg,
+ const celix_properties_t* metadata) {
auto* ser = (pubsub_serializer_handler_t*)handle;
serializeAndPrint(ser, msgId, rawMsg);
EXPECT_STREQ(msgType, "msg");
const auto *msg = static_cast<const msg_t*>(rawMsg);
EXPECT_GE(msg->seqNr, 0);
- fprintf(stdout, "Got message in postSend interceptor %p with seq nr %i\n", handle, msg->seqNr);
+ EXPECT_STREQ(celix_properties_get(metadata, "test", nullptr), "preSend");
+ fprintf(stdout, "Got message in postSend interceptor %s/%s for type %s and ser %s with seq nr %i\n", intProps->scope, intProps->topic, intProps->psaType, intProps->serializationType, msg->seqNr);
+ };
+ interceptor->preReceive = [](void *, const pubsub_interceptor_properties_t *, const char *, const uint32_t,
+ const void *, celix_properties_t* metadata) {
+ celix_properties_set(metadata, "test", "preReceive");
+ return true;
};
- interceptor->postReceive = [](void *handle, pubsub_interceptor_properties_t *, const char *msgType, uint32_t msgId, const void *rawMsg,
- const celix_properties_t *) {
+ interceptor->postReceive = [](void *handle, const pubsub_interceptor_properties_t* intProps, const char *msgType, uint32_t msgId, const void *rawMsg,
+ const celix_properties_t* metadata) {
auto* ser = (pubsub_serializer_handler_t*)handle;
serializeAndPrint(ser, msgId, rawMsg);
EXPECT_STREQ(msgType, "msg");
const auto *msg = static_cast<const msg_t*>(rawMsg);
EXPECT_GE(msg->seqNr, 0);
- fprintf(stdout, "Got message in postReceive interceptor %p with seq nr %i\n", handle, msg->seqNr);
+ EXPECT_STREQ(celix_properties_get(metadata, "test", nullptr), "preReceive");
+ fprintf(stdout, "Got message in postReceive interceptor %s/%s for type %s and ser %s with seq nr %i\n", intProps->scope, intProps->topic, intProps->psaType, intProps-> serializationType, msg->seqNr);
};
//note registering identical services to validate multiple interceptors
return ctx->registerService<pubsub_interceptor>(interceptor, PUBSUB_INTERCEPTOR_SERVICE_NAME).build();
@@ -96,5 +108,7 @@ TEST_F(PubSubInterceptorTestSuite, InterceptorWithSinglePublishersAndMultipleRec
auto reg2 = createInterceptor(ctx);
auto reg3 = createInterceptor(ctx);
+ //TODO stop after a certain amount of messages send
+ //TODO also test with tcp v2.
sleep(5);
}
diff --git a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c
index 8acb8e2..4178b50 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c
@@ -144,7 +144,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
receiver->protocol = protocol;
receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
receiver->topic = strndup(topic, 1024 * 1024);
- pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler);
+ receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_TCP_ADMIN_TYPE, "*unknown*");
const char *staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_CONNECT_URLS_FOR, topic, scope);
const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope);
const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
diff --git a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c
index b287ebd..32a3328 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c
@@ -145,7 +145,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
if (uuid != NULL) {
uuid_parse(uuid, sender->fwUUID);
}
- pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+ sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_TCP_ADMIN_TYPE, "*unknown*");
sender->isPassive = false;
sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED);
char *urls = NULL;
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
index ad321e8..36816f1 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
@@ -124,7 +124,8 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
receiver->protocol = protocol;
receiver->scope = celix_utils_strdup(scope);
receiver->topic = celix_utils_strdup(topic);
- pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler);
+ receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_TCP_ADMIN_TYPE,
+ pubsub_serializerHandler_getSerializationType(serializerHandler));
const char *staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_CONNECT_URLS_FOR, topic, scope);
const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope);
const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
index 2c8daf4..3c58e84 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
@@ -122,7 +122,8 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
if (uuid != NULL) {
uuid_parse(uuid, sender->fwUUID);
}
- pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+ sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_TCP_ADMIN_TYPE,
+ pubsub_serializerHandler_getSerializationType(serializerHandler));
sender->isPassive = false;
char *urls = NULL;
const char *ip = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_IP_KEY, NULL);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c
index 9cd99f4..62b2fbf 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c
@@ -157,7 +157,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
receiver->topic = strndup(topic, 1024 * 1024);
receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
- pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler);
+ receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_ZMQ_ADMIN_TYPE, "*unknown*");
#ifdef BUILD_WITH_ZMQ_SECURITY
char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c
index 1e95c0b..aba8893 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c
@@ -157,7 +157,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
sender->zeroCopyEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_ZEROCOPY_ENABLED, PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED);
- pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+ sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_ZMQ_ADMIN_TYPE, "*unknown*");
//setting up zmq socket for ZMQ TopicSender
{
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
index 0fc2bc2..d6c5805 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
@@ -138,7 +138,8 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
receiver->scope = scope == NULL ? NULL : celix_utils_strdup(scope);
receiver->topic = celix_utils_strdup(topic);
- pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler);
+ receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_ZMQ_ADMIN_TYPE,
+ pubsub_serializerHandler_getSerializationType(serHandler));
#ifdef BUILD_WITH_ZMQ_SECURITY
char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
index 7d9e750..d1f36a5 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
@@ -142,7 +142,8 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
}
sender->zeroCopyEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_ZEROCOPY_ENABLED, PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED);
- pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+ sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_ZMQ_ADMIN_TYPE,
+ pubsub_serializerHandler_getSerializationType(serializerHandler));
//setting up zmq socket for ZMQ TopicSender
{
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h
index eff1e78..69bffa9 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h
@@ -30,11 +30,13 @@ extern "C" {
#include "celix_properties.h"
#define PUBSUB_INTERCEPTOR_SERVICE_NAME "pubsub.interceptor"
-#define PUBSUB_INTERCEPTOR_SERVICE_VERSION "1.0.0"
+#define PUBSUB_INTERCEPTOR_SERVICE_VERSION "2.0.0"
typedef struct pubsub_interceptor_properties {
- const char *scope;
- const char *topic;
+ const char* psaType; //i.e. zmq, tcp, etc
+ const char* serializationType; //i.e. json, avrobin
+ const char* scope;
+ const char* topic;
} pubsub_interceptor_properties_t;
/**
@@ -60,7 +62,7 @@ struct pubsub_interceptor {
* @param metadata The metadata of the message
* @return True if the send should continue.
*/
- bool (*preSend)(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+ bool (*preSend)(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
/**
* @brief postSend will be called when a user called send on a pubsub/publisher, but after the message is "handed over" to the actual pubsub technology (i.e. TCP stack, shared memory, etc)
@@ -74,7 +76,7 @@ struct pubsub_interceptor {
* @param message The actual message pointer
* @param metadata The metadata of the message
*/
- void (*postSend)(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+ void (*postSend)(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
/**
* @brief preReceive will be called when is message is received in a pubsub admin, but before the pubsub/subscriber callback is called.
@@ -89,7 +91,7 @@ struct pubsub_interceptor {
* @param metadata The metadata of the message
* @return True if the pubsub/subsciber callback should be called.
*/
- bool (*preReceive)(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+ bool (*preReceive)(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata);
/**
* @brief postReceive will be called when is message is received in a pubsub admin and is called after the pubsub/subscriber callback is called.
@@ -103,7 +105,7 @@ struct pubsub_interceptor {
* @param message The actual message pointer
* @param metadata The metadata of the message
*/
- void (*postReceive)(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
+ void (*postReceive)(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata);
};
typedef struct pubsub_interceptor pubsub_interceptor_t;
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
index 6595853..a12a865 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
@@ -32,13 +32,13 @@
typedef struct pubsub_interceptors_handler pubsub_interceptors_handler_t;
-celix_status_t pubsubInterceptorsHandler_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, pubsub_interceptors_handler_t **handler);
-celix_status_t pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler);
+pubsub_interceptors_handler_t* pubsubInterceptorsHandler_create(celix_bundle_context_t* ctx, const char* scope, const char* topic, const char* psaType, const char* serializationType);
+void pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler);
-bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t **metadata);
-void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata);
-bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t **metadata);
-void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata);
+bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char* messageType, uint32_t messageId, const void* message, celix_properties_t** metadata);
+void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char* messageType, uint32_t messageId, const void* message, celix_properties_t* metadata);
+bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char* messageType, uint32_t messageId, const void* message, celix_properties_t** metadata);
+void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char* messageType, uint32_t messageId, const void* message, celix_properties_t* metadata);
#ifdef __cplusplus
}
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
index 6118fbe..8191f94 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
@@ -45,45 +45,39 @@ static int referenceCompare(const void *a, const void *b);
static void pubsubInterceptorsHandler_addInterceptor(void *handle, void *svc, const celix_properties_t *props);
static void pubsubInterceptorsHandler_removeInterceptor(void *handle, void *svc, const celix_properties_t *props);
-celix_status_t pubsubInterceptorsHandler_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, pubsub_interceptors_handler_t **handler) {
- celix_status_t status = CELIX_SUCCESS;
-
- *handler = calloc(1, sizeof(**handler));
- if (!*handler) {
- status = CELIX_ENOMEM;
- } else {
- (*handler)->ctx = ctx;
-
- (*handler)->properties.scope = scope;
- (*handler)->properties.topic = topic;
-
- (*handler)->interceptors = celix_arrayList_create();
-
- status = celixThreadMutex_create(&(*handler)->lock, NULL);
-
- if (status == CELIX_SUCCESS) {
- // Create service tracker here, and not in the activator
- celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
- opts.filter.serviceName = PUBSUB_INTERCEPTOR_SERVICE_NAME;
- opts.filter.ignoreServiceLanguage = true;
- opts.callbackHandle = *handler;
- opts.addWithProperties = pubsubInterceptorsHandler_addInterceptor;
- opts.removeWithProperties = pubsubInterceptorsHandler_removeInterceptor;
- (*handler)->interceptorsTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- }
- }
-
- return status;
+pubsub_interceptors_handler_t* pubsubInterceptorsHandler_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, const char* psaType, const char* serType) {
+ pubsub_interceptors_handler_t* handler = calloc(1, sizeof(*handler));
+ handler->ctx = ctx;
+ handler->properties.scope = celix_utils_strdup(scope);
+ handler->properties.topic = celix_utils_strdup(topic);
+ handler->properties.psaType = celix_utils_strdup(psaType);
+ handler->properties.serializationType = celix_utils_strdup(serType);
+ handler->interceptors = celix_arrayList_create();
+ celixThreadMutex_create(&handler->lock, NULL);
+
+ // Create service tracker here, and not in the activator
+ celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+ opts.filter.serviceName = PUBSUB_INTERCEPTOR_SERVICE_NAME;
+ opts.filter.ignoreServiceLanguage = true;
+ opts.callbackHandle = handler;
+ opts.addWithProperties = pubsubInterceptorsHandler_addInterceptor;
+ opts.removeWithProperties = pubsubInterceptorsHandler_removeInterceptor;
+ handler->interceptorsTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+
+ return handler;
}
-celix_status_t pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler) {
- celix_bundleContext_stopTracker(handler->ctx, handler->interceptorsTrackerId);
-
- celix_arrayList_destroy(handler->interceptors);
- celixThreadMutex_destroy(&handler->lock);
- free(handler);
-
- return CELIX_SUCCESS;
+void pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler) {
+ if (handler != NULL) {
+ celix_bundleContext_stopTracker(handler->ctx, handler->interceptorsTrackerId);
+ celix_arrayList_destroy(handler->interceptors);
+ celixThreadMutex_destroy(&handler->lock);
+ free((char*)handler->properties.scope);
+ free((char*)handler->properties.topic);
+ free((char*)handler->properties.psaType);
+ free((char*)handler->properties.serializationType);
+ free(handler);
+ }
}
void pubsubInterceptorsHandler_addInterceptor(void *handle, void *svc, const celix_properties_t *props) {