You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2021/07/28 07:36:41 UTC

[celix] branch feature/pubsub-interceptor-fix updated: Updates pubsub tcp receiver so that no extra message gets deserialized after a subscriber takes ownership if this is not needed.

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/feature/pubsub-interceptor-fix by this push:
     new 87491ea  Updates pubsub tcp receiver so that no extra message gets deserialized after a subscriber takes ownership if this is not needed.
87491ea is described below

commit 87491eaef5ecf56759e6f6920dcc2213ceadbff7
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Wed Jul 28 09:36:23 2021 +0200

    Updates pubsub tcp receiver so that no extra message gets deserialized after a subscriber takes ownership if this is not needed.
---
 .../v2/src/pubsub_tcp_topic_receiver.c             | 23 ++++++++--
 .../include/pubsub_interceptors_handler.h          | 35 +++++++++++++++
 .../pubsub_spi/src/pubsub_interceptors_handler.c   |  8 ++++
 .../src/PubSubSerializationHandlerTestSuite.cc     | 51 +++++++++++++++-------
 .../pubsub_utils/src/pubsub_serializer_handler.c   |  6 +++
 5 files changed, 105 insertions(+), 18 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
index 6945975..597e4ff 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
@@ -437,8 +437,7 @@ static void callReceivers(pubsub_tcp_topic_receiver_t *receiver, const char* msg
         psa_tcp_subscriber_entry_t* entry = hashMapIterator_nextValue(&iter);
         if (entry != NULL && entry->subscriberSvc->receive != NULL) {
             entry->subscriberSvc->receive(entry->subscriberSvc->handle, msgFqn, message->header.msgId, *msg, metadata, release);
-            if (!(*release)) {
-                //receive function has taken ownership, deserialize again for new message
+            if (!(*release) && hashMapIterator_hasNext(&iter)) { //receive function has taken ownership, deserialize again for new message
                 struct iovec deSerializeBuffer;
                 deSerializeBuffer.iov_base = message->payload.payload;
                 deSerializeBuffer.iov_len = message->payload.length;
@@ -452,6 +451,8 @@ static void callReceivers(pubsub_tcp_topic_receiver_t *receiver, const char* msg
                            receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
                     break;
                 }
+            } else if (!(*release)) { //receive function has taken ownership, but no receive left anymore. set msg to null
+                *msg = NULL;
             }
             *release = true;
         }
@@ -492,7 +493,23 @@ static inline void processMsg(void* handle, const pubsub_protocol_message_t *mes
             bool release = true;
             if (cont) {
                 callReceivers(receiver, msgFqn, message, &deSerializedMsg, &release, metadata);
-                pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, message->header.msgId, deSerializedMsg, metadata);
+                if (pubsubInterceptorHandler_nrOfInterceptors(receiver->interceptorsHandler) > 0) {
+                    if (deSerializedMsg == NULL) { //message deleted, but still need to call interceptors -> deserialize new message
+                        release = true;
+                        status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId,
+                                                                                 message->header.msgMajorVersion,
+                                                                                 message->header.msgMinorVersion,
+                                                                                 &deSerializeBuffer, 0, &deSerializedMsg);
+                        if (status != CELIX_SUCCESS) {
+                            L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
+                                   receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+                        } else {
+                            pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, message->header.msgId, deSerializedMsg, metadata);
+                        }
+                    } else {
+                        pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, message->header.msgId, deSerializedMsg, metadata);
+                    }
+                }
             } else {
                 L_TRACE("Skipping receive for msg type %s, based on pre receive interceptor result", msgFqn);
             }
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
index a12a865..4d8c58d 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h
@@ -32,14 +32,49 @@
 
 typedef struct pubsub_interceptors_handler pubsub_interceptors_handler_t;
 
+/**
+ * @brief Creates a pubsub interceptor handler for a specific scope, topic, psa type and serialization type.
+ *
+ * A interceptor handler will track pubsub interceptors and will call these interceptors when using the
+ * invokePreSend, invokePostSend, invokePreReceive and invokePostReceive functions.
+ *
+ * The interceptor handler will forward the topic, scope, psa type and serialization type info to every interceptor
+ * functions as pubsub_interceptor_properties_t.
+ *
+ */
 pubsub_interceptors_handler_t* pubsubInterceptorsHandler_create(celix_bundle_context_t* ctx, const char* scope, const char* topic,  const char* psaType, const char* serializationType);
+
+/**
+ * @brief Destroy the interceptor handler
+ * @param handler
+ */
 void pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler);
 
+/**
+ * @brief Calls all the tracked interceptor service preSend functions.
+ */
 bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char* messageType, uint32_t messageId, const void* message, celix_properties_t** metadata);
+
+/**
+ * @brief Calls all the tracked interceptor service postSend functions.
+ */
 void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char* messageType, uint32_t messageId, const void* message, celix_properties_t* metadata);
+
+/**
+ * @brief Calls all the tracked interceptor service preReceive functions.
+ */
 bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char* messageType, uint32_t messageId, const void* message, celix_properties_t** metadata);
+
+/**
+ * @brief Calls all the tracked interceptor service postReceive functions.
+ */
 void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char* messageType, uint32_t messageId, const void* message, celix_properties_t* metadata);
 
+/**
+ * @brief Return the nr of interceptors currently tracked.
+ */
+size_t pubsubInterceptorHandler_nrOfInterceptors(pubsub_interceptors_handler_t *handler);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
index 8191f94..5413d02 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
@@ -204,4 +204,12 @@ int referenceCompare(const void *a, const void *b) {
     long servRankingB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
 
     return celix_utils_compareServiceIdsAndRanking(servIdA, servRankingA, servIdB, servRankingB);
+}
+
+size_t pubsubInterceptorHandler_nrOfInterceptors(pubsub_interceptors_handler_t *handler) {
+    size_t nr = 0;
+    celixThreadMutex_lock(&handler->lock);
+    nr = (size_t)celix_arrayList_size(handler->interceptors);
+    celixThreadMutex_unlock(&handler->lock);
+    return nr;
 }
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc b/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
index 77a2fa0..f55ec06 100644
--- a/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
+++ b/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
@@ -206,18 +206,21 @@ TEST_F(PubSubSerializationHandlerTestSuite, NoBackwardsCompatbile) {
 
 TEST_F(PubSubSerializationHandlerTestSuite, CallServiceMethods) {
     auto *handler = pubsub_serializerHandler_create(ctx.get(), "json", false);
-
     long svcId1 = registerSerSvc("json", 42, "example::Msg1", "1.0.0");
 
+    void* dummyMsg = (void*)0x42;
+    iovec* dummyBuffer = (iovec*)0x43;
+    size_t dummyBufferSize = 0;
+
     EXPECT_EQ(1, pubsub_serializerHandler_messageSerializationServiceCount(handler));
     EXPECT_EQ(0, serializeCallCount);
     EXPECT_EQ(0, freeSerializedMsgCallCount);
     EXPECT_EQ(0, deserializeCallCount);
     EXPECT_EQ(0, freeDeserializedMsgCallCount);
-    pubsub_serializerHandler_serialize(handler, 42, nullptr, nullptr, nullptr);
-    pubsub_serializerHandler_freeSerializedMsg(handler, 42, nullptr, 0);
-    pubsub_serializerHandler_deserialize(handler, 42, 1, 0, nullptr, 0, nullptr);
-    pubsub_serializerHandler_freeDeserializedMsg(handler, 42, nullptr);
+    pubsub_serializerHandler_serialize(handler, 42, dummyMsg, &dummyBuffer, &dummyBufferSize);
+    pubsub_serializerHandler_freeSerializedMsg(handler, 42, dummyBuffer, dummyBufferSize);
+    pubsub_serializerHandler_deserialize(handler, 42, 1, 0, dummyBuffer, dummyBufferSize, &dummyMsg);
+    pubsub_serializerHandler_freeDeserializedMsg(handler, 42, dummyMsg);
     EXPECT_EQ(1, serializeCallCount);
     EXPECT_EQ(1, freeSerializedMsgCallCount);
     EXPECT_EQ(1, deserializeCallCount);
@@ -232,16 +235,20 @@ TEST_F(PubSubSerializationHandlerTestSuite, MismatchedCallServiceMethods) {
 
     long svcId1 = registerSerSvc("json", 42, "example::Msg1", "1.0.0");
 
+    void* dummyMsg = (void*)0x42;
+    iovec* dummyBuffer = (iovec*)0x43;
+    size_t dummyBufferSize = 0;
+
     EXPECT_EQ(1, pubsub_serializerHandler_messageSerializationServiceCount(handler));
     EXPECT_EQ(0, serializeCallCount);
     EXPECT_EQ(0, freeSerializedMsgCallCount);
     EXPECT_EQ(0, deserializeCallCount);
     EXPECT_EQ(0, freeDeserializedMsgCallCount);
-    pubsub_serializerHandler_serialize(handler, 43, nullptr, nullptr, nullptr);
-    pubsub_serializerHandler_freeSerializedMsg(handler, 43, nullptr, 0);
-    pubsub_serializerHandler_deserialize(handler, 43, 1, 0, nullptr, 0, nullptr);
-    pubsub_serializerHandler_deserialize(handler, 42, 1, 1, nullptr, 0, nullptr); //note wrong version
-    pubsub_serializerHandler_freeDeserializedMsg(handler, 43, nullptr);
+    pubsub_serializerHandler_serialize(handler, 43, dummyMsg, &dummyBuffer, &dummyBufferSize);
+    pubsub_serializerHandler_freeSerializedMsg(handler, 43, dummyBuffer, dummyBufferSize);
+    pubsub_serializerHandler_deserialize(handler, 43, 1, 0, dummyBuffer, dummyBufferSize, &dummyMsg);
+    pubsub_serializerHandler_deserialize(handler, 42, 1, 1, dummyBuffer, dummyBufferSize, &dummyMsg); //note wrong version
+    pubsub_serializerHandler_freeDeserializedMsg(handler, 43, dummyMsg);
     EXPECT_EQ(0, serializeCallCount);
     EXPECT_EQ(0, freeSerializedMsgCallCount);
     EXPECT_EQ(0, deserializeCallCount);
@@ -253,15 +260,18 @@ TEST_F(PubSubSerializationHandlerTestSuite, MismatchedCallServiceMethods) {
 
 TEST_F(PubSubSerializationHandlerTestSuite, BackwardsCompatibleCall) {
     auto *handler = pubsub_serializerHandler_create(ctx.get(), "json", true);
-
     long svcId1 = registerSerSvc("json", 42, "example::Msg1", "1.0.0");
 
+    void* dummyMsg = (void*)0x42;
+    iovec* dummyBuffer = (iovec*)0x43;
+    size_t dummyBufferSize = 0;
+
     EXPECT_EQ(1, pubsub_serializerHandler_messageSerializationServiceCount(handler));
     EXPECT_EQ(0, deserializeCallCount);
-    pubsub_serializerHandler_deserialize(handler, 42, 1, 0, nullptr, 0, nullptr);
-    pubsub_serializerHandler_deserialize(handler, 42, 1, 1, nullptr, 0, nullptr); //note compatible
-    pubsub_serializerHandler_deserialize(handler, 42, 1, 15, nullptr, 0, nullptr); //note compatible
-    pubsub_serializerHandler_deserialize(handler, 42, 2, 9, nullptr, 0, nullptr); //note not compatible
+    pubsub_serializerHandler_deserialize(handler, 42, 1, 0, dummyBuffer, dummyBufferSize, &dummyMsg);
+    pubsub_serializerHandler_deserialize(handler, 42, 1, 1, dummyBuffer, dummyBufferSize, &dummyMsg); //note compatible
+    pubsub_serializerHandler_deserialize(handler, 42, 1, 15, dummyBuffer, dummyBufferSize, &dummyMsg); //note compatible
+    pubsub_serializerHandler_deserialize(handler, 42, 2, 9, dummyBuffer, dummyBufferSize, &dummyMsg); //note not compatible
     EXPECT_EQ(3, deserializeCallCount);
 
     celix_bundleContext_unregisterService(ctx.get(), svcId1);
@@ -313,4 +323,15 @@ TEST_F(PubSubSerializationHandlerTestSuite, GetMsgInfo) {
 
     celix_bundleContext_unregisterService(ctx.get(), svcId1);
     pubsub_serializerHandler_destroy(handler);
+}
+
+TEST_F(PubSubSerializationHandlerTestSuite, CallingFreeWithNULLWillBeSilentlyIgnored) {
+    auto *handler = pubsub_serializerHandler_create(ctx.get(), "json", true);
+    long svcId1 = registerSerSvc("json", 42, "example::Msg1", "1.0.0");
+
+    EXPECT_EQ(pubsub_serializerHandler_freeDeserializedMsg(handler, 42, nullptr), CELIX_SUCCESS);
+    EXPECT_EQ(pubsub_serializerHandler_freeSerializedMsg(handler, 42, nullptr, 10), CELIX_SUCCESS);
+
+    celix_bundleContext_unregisterService(ctx.get(), svcId1);
+    pubsub_serializerHandler_destroy(handler);
 }
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
index e339795..f39b36e 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
@@ -315,6 +315,9 @@ celix_status_t pubsub_serializerHandler_serialize(pubsub_serializer_handler_t* h
 
 celix_status_t pubsub_serializerHandler_freeSerializedMsg(pubsub_serializer_handler_t* handler, uint32_t msgId, struct iovec* input, size_t inputIovLen) {
     celix_status_t status = CELIX_SUCCESS;
+    if (input == NULL) {
+        return status; //silently ignore
+    }
     celixThreadRwlock_readLock(&handler->lock);
     pubsub_serialization_service_entry_t* entry = findEntry(handler, msgId);
     if (entry != NULL) {
@@ -353,6 +356,9 @@ celix_status_t pubsub_serializerHandler_deserialize(pubsub_serializer_handler_t*
 
 celix_status_t pubsub_serializerHandler_freeDeserializedMsg(pubsub_serializer_handler_t* handler, uint32_t msgId, void* msg) {
     celix_status_t status = CELIX_SUCCESS;
+    if (msg == NULL) {
+         return status; //silently ignore
+    }
     celixThreadRwlock_readLock(&handler->lock);
     pubsub_serialization_service_entry_t* entry = findEntry(handler, msgId);
     if (entry != NULL) {