You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2021/07/28 07:36:41 UTC
[celix] branch feature/pubsub-interceptor-fix updated: Updates
pubsub tcp receiver so that no extra message gets deserialized after a
subscriber takes ownership if this is not needed.
This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/feature/pubsub-interceptor-fix by this push:
new 87491ea Updates pubsub tcp receiver so that no extra message gets deserialized after a subscriber takes ownership if this is not needed.
87491ea is described below
commit 87491eaef5ecf56759e6f6920dcc2213ceadbff7
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Wed Jul 28 09:36:23 2021 +0200
Updates pubsub tcp receiver so that no extra message gets deserialized after a subscriber takes ownership if this is not needed.
---
.../v2/src/pubsub_tcp_topic_receiver.c | 23 ++++++++--
.../include/pubsub_interceptors_handler.h | 35 +++++++++++++++
.../pubsub_spi/src/pubsub_interceptors_handler.c | 8 ++++
.../src/PubSubSerializationHandlerTestSuite.cc | 51 +++++++++++++++-------
.../pubsub_utils/src/pubsub_serializer_handler.c | 6 +++
5 files changed, 105 insertions(+), 18 deletions(-)
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
index 6945975..597e4ff 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
@@ -437,8 +437,7 @@ static void callReceivers(pubsub_tcp_topic_receiver_t *receiver, const char* msg
psa_tcp_subscriber_entry_t* entry = hashMapIterator_nextValue(&iter);
if (entry != NULL && entry->subscriberSvc->receive != NULL) {
entry->subscriberSvc->receive(entry->subscriberSvc->handle, msgFqn, message->header.msgId, *msg, metadata, release);
- if (!(*release)) {
- //receive function has taken ownership, deserialize again for new message
+ if (!(*release) && hashMapIterator_hasNext(&iter)) { //receive function has taken ownership, deserialize again for new message
struct iovec deSerializeBuffer;
deSerializeBuffer.iov_base = message->payload.payload;
deSerializeBuffer.iov_len = message->payload.length;
@@ -452,6 +451,8 @@ static void callReceivers(pubsub_tcp_topic_receiver_t *receiver, const char* msg
receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
break;
}
+ } else if (!(*release)) { //receive function has taken ownership, but no receive left anymore. set msg to null
+ *msg = NULL;
}
*release = true;
}
@@ -492,7 +493,23 @@ static inline void processMsg(void* handle, const pubsub_protocol_message_t *mes
bool release = true;
if (cont) {
callReceivers(receiver, msgFqn, message, &deSerializedMsg, &release, metadata);
- pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, message->header.msgId, deSerializedMsg, metadata);
+ if (pubsubInterceptorHandler_nrOfInterceptors(receiver->interceptorsHandler) > 0) {
+ if (deSerializedMsg == NULL) { //message deleted, but still need to call interceptors -> deserialize new message
+ release = true;
+ status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId,
+ message->header.msgMajorVersion,
+ message->header.msgMinorVersion,
+ &deSerializeBuffer, 0, &deSerializedMsg);
+ if (status != CELIX_SUCCESS) {
+ L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
+ receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+ } else {
+ pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, message->header.msgId, deSerializedMsg, metadata);
+ }
+ } else {
+ pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, message->header.msgId, deSerializedMsg, metadata);
+ }
+ }
} else {
L_TRACE("Skipping receive for msg type %s, based on pre receive interceptor result", msgFqn);
}
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
index a12a865..4d8c58d 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
@@ -32,14 +32,49 @@
typedef struct pubsub_interceptors_handler pubsub_interceptors_handler_t;
+/**
+ * @brief Creates a pubsub interceptor handler for a specific scope, topic, psa type and serialization type.
+ *
+ * A interceptor handler will track pubsub interceptors and will call these interceptors when using the
+ * invokePreSend, invokePostSend, invokePreReceive and invokePostReceive functions.
+ *
+ * The interceptor handler will forward the topic, scope, psa type and serialization type info to every interceptor
+ * functions as pubsub_interceptor_properties_t.
+ *
+ */
pubsub_interceptors_handler_t* pubsubInterceptorsHandler_create(celix_bundle_context_t* ctx, const char* scope, const char* topic, const char* psaType, const char* serializationType);
+
+/**
+ * @brief Destroy the interceptor handler
+ * @param handler
+ */
void pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler);
+/**
+ * @brief Calls all the tracked interceptor service preSend functions.
+ */
bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char* messageType, uint32_t messageId, const void* message, celix_properties_t** metadata);
+
+/**
+ * @brief Calls all the tracked interceptor service postSend functions.
+ */
void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char* messageType, uint32_t messageId, const void* message, celix_properties_t* metadata);
+
+/**
+ * @brief Calls all the tracked interceptor service preReceive functions.
+ */
bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char* messageType, uint32_t messageId, const void* message, celix_properties_t** metadata);
+
+/**
+ * @brief Calls all the tracked interceptor service postReceive functions.
+ */
void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char* messageType, uint32_t messageId, const void* message, celix_properties_t* metadata);
+/**
+ * @brief Return the nr of interceptors currently tracked.
+ */
+size_t pubsubInterceptorHandler_nrOfInterceptors(pubsub_interceptors_handler_t *handler);
+
#ifdef __cplusplus
}
#endif
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
index 8191f94..5413d02 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
@@ -204,4 +204,12 @@ int referenceCompare(const void *a, const void *b) {
long servRankingB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
return celix_utils_compareServiceIdsAndRanking(servIdA, servRankingA, servIdB, servRankingB);
+}
+
+size_t pubsubInterceptorHandler_nrOfInterceptors(pubsub_interceptors_handler_t *handler) {
+ size_t nr = 0;
+ celixThreadMutex_lock(&handler->lock);
+ nr = (size_t)celix_arrayList_size(handler->interceptors);
+ celixThreadMutex_unlock(&handler->lock);
+ return nr;
}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc b/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
index 77a2fa0..f55ec06 100644
--- a/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
+++ b/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
@@ -206,18 +206,21 @@ TEST_F(PubSubSerializationHandlerTestSuite, NoBackwardsCompatbile) {
TEST_F(PubSubSerializationHandlerTestSuite, CallServiceMethods) {
auto *handler = pubsub_serializerHandler_create(ctx.get(), "json", false);
-
long svcId1 = registerSerSvc("json", 42, "example::Msg1", "1.0.0");
+ void* dummyMsg = (void*)0x42;
+ iovec* dummyBuffer = (iovec*)0x43;
+ size_t dummyBufferSize = 0;
+
EXPECT_EQ(1, pubsub_serializerHandler_messageSerializationServiceCount(handler));
EXPECT_EQ(0, serializeCallCount);
EXPECT_EQ(0, freeSerializedMsgCallCount);
EXPECT_EQ(0, deserializeCallCount);
EXPECT_EQ(0, freeDeserializedMsgCallCount);
- pubsub_serializerHandler_serialize(handler, 42, nullptr, nullptr, nullptr);
- pubsub_serializerHandler_freeSerializedMsg(handler, 42, nullptr, 0);
- pubsub_serializerHandler_deserialize(handler, 42, 1, 0, nullptr, 0, nullptr);
- pubsub_serializerHandler_freeDeserializedMsg(handler, 42, nullptr);
+ pubsub_serializerHandler_serialize(handler, 42, dummyMsg, &dummyBuffer, &dummyBufferSize);
+ pubsub_serializerHandler_freeSerializedMsg(handler, 42, dummyBuffer, dummyBufferSize);
+ pubsub_serializerHandler_deserialize(handler, 42, 1, 0, dummyBuffer, dummyBufferSize, &dummyMsg);
+ pubsub_serializerHandler_freeDeserializedMsg(handler, 42, dummyMsg);
EXPECT_EQ(1, serializeCallCount);
EXPECT_EQ(1, freeSerializedMsgCallCount);
EXPECT_EQ(1, deserializeCallCount);
@@ -232,16 +235,20 @@ TEST_F(PubSubSerializationHandlerTestSuite, MismatchedCallServiceMethods) {
long svcId1 = registerSerSvc("json", 42, "example::Msg1", "1.0.0");
+ void* dummyMsg = (void*)0x42;
+ iovec* dummyBuffer = (iovec*)0x43;
+ size_t dummyBufferSize = 0;
+
EXPECT_EQ(1, pubsub_serializerHandler_messageSerializationServiceCount(handler));
EXPECT_EQ(0, serializeCallCount);
EXPECT_EQ(0, freeSerializedMsgCallCount);
EXPECT_EQ(0, deserializeCallCount);
EXPECT_EQ(0, freeDeserializedMsgCallCount);
- pubsub_serializerHandler_serialize(handler, 43, nullptr, nullptr, nullptr);
- pubsub_serializerHandler_freeSerializedMsg(handler, 43, nullptr, 0);
- pubsub_serializerHandler_deserialize(handler, 43, 1, 0, nullptr, 0, nullptr);
- pubsub_serializerHandler_deserialize(handler, 42, 1, 1, nullptr, 0, nullptr); //note wrong version
- pubsub_serializerHandler_freeDeserializedMsg(handler, 43, nullptr);
+ pubsub_serializerHandler_serialize(handler, 43, dummyMsg, &dummyBuffer, &dummyBufferSize);
+ pubsub_serializerHandler_freeSerializedMsg(handler, 43, dummyBuffer, dummyBufferSize);
+ pubsub_serializerHandler_deserialize(handler, 43, 1, 0, dummyBuffer, dummyBufferSize, &dummyMsg);
+ pubsub_serializerHandler_deserialize(handler, 42, 1, 1, dummyBuffer, dummyBufferSize, &dummyMsg); //note wrong version
+ pubsub_serializerHandler_freeDeserializedMsg(handler, 43, dummyMsg);
EXPECT_EQ(0, serializeCallCount);
EXPECT_EQ(0, freeSerializedMsgCallCount);
EXPECT_EQ(0, deserializeCallCount);
@@ -253,15 +260,18 @@ TEST_F(PubSubSerializationHandlerTestSuite, MismatchedCallServiceMethods) {
TEST_F(PubSubSerializationHandlerTestSuite, BackwardsCompatibleCall) {
auto *handler = pubsub_serializerHandler_create(ctx.get(), "json", true);
-
long svcId1 = registerSerSvc("json", 42, "example::Msg1", "1.0.0");
+ void* dummyMsg = (void*)0x42;
+ iovec* dummyBuffer = (iovec*)0x43;
+ size_t dummyBufferSize = 0;
+
EXPECT_EQ(1, pubsub_serializerHandler_messageSerializationServiceCount(handler));
EXPECT_EQ(0, deserializeCallCount);
- pubsub_serializerHandler_deserialize(handler, 42, 1, 0, nullptr, 0, nullptr);
- pubsub_serializerHandler_deserialize(handler, 42, 1, 1, nullptr, 0, nullptr); //note compatible
- pubsub_serializerHandler_deserialize(handler, 42, 1, 15, nullptr, 0, nullptr); //note compatible
- pubsub_serializerHandler_deserialize(handler, 42, 2, 9, nullptr, 0, nullptr); //note not compatible
+ pubsub_serializerHandler_deserialize(handler, 42, 1, 0, dummyBuffer, dummyBufferSize, &dummyMsg);
+ pubsub_serializerHandler_deserialize(handler, 42, 1, 1, dummyBuffer, dummyBufferSize, &dummyMsg); //note compatible
+ pubsub_serializerHandler_deserialize(handler, 42, 1, 15, dummyBuffer, dummyBufferSize, &dummyMsg); //note compatible
+ pubsub_serializerHandler_deserialize(handler, 42, 2, 9, dummyBuffer, dummyBufferSize, &dummyMsg); //note not compatible
EXPECT_EQ(3, deserializeCallCount);
celix_bundleContext_unregisterService(ctx.get(), svcId1);
@@ -313,4 +323,15 @@ TEST_F(PubSubSerializationHandlerTestSuite, GetMsgInfo) {
celix_bundleContext_unregisterService(ctx.get(), svcId1);
pubsub_serializerHandler_destroy(handler);
+}
+
+TEST_F(PubSubSerializationHandlerTestSuite, CallingFreeWithNULLWillBeSilentlyIgnored) {
+ auto *handler = pubsub_serializerHandler_create(ctx.get(), "json", true);
+ long svcId1 = registerSerSvc("json", 42, "example::Msg1", "1.0.0");
+
+ EXPECT_EQ(pubsub_serializerHandler_freeDeserializedMsg(handler, 42, nullptr), CELIX_SUCCESS);
+ EXPECT_EQ(pubsub_serializerHandler_freeSerializedMsg(handler, 42, nullptr, 10), CELIX_SUCCESS);
+
+ celix_bundleContext_unregisterService(ctx.get(), svcId1);
+ pubsub_serializerHandler_destroy(handler);
}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
index e339795..f39b36e 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
@@ -315,6 +315,9 @@ celix_status_t pubsub_serializerHandler_serialize(pubsub_serializer_handler_t* h
celix_status_t pubsub_serializerHandler_freeSerializedMsg(pubsub_serializer_handler_t* handler, uint32_t msgId, struct iovec* input, size_t inputIovLen) {
celix_status_t status = CELIX_SUCCESS;
+ if (input == NULL) {
+ return status; //silently ignore
+ }
celixThreadRwlock_readLock(&handler->lock);
pubsub_serialization_service_entry_t* entry = findEntry(handler, msgId);
if (entry != NULL) {
@@ -353,6 +356,9 @@ celix_status_t pubsub_serializerHandler_deserialize(pubsub_serializer_handler_t*
celix_status_t pubsub_serializerHandler_freeDeserializedMsg(pubsub_serializer_handler_t* handler, uint32_t msgId, void* msg) {
celix_status_t status = CELIX_SUCCESS;
+ if (msg == NULL) {
+ return status; //silently ignore
+ }
celixThreadRwlock_readLock(&handler->lock);
pubsub_serialization_service_entry_t* entry = findEntry(handler, msgId);
if (entry != NULL) {