You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2021/07/27 18:21:02 UTC

[celix] branch feature/pubsub-interceptor-fix updated: Enables pubsub tcp interceptor tests and fixes mem leaks

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/feature/pubsub-interceptor-fix by this push:
     new 90497a9  Enables pubsub tcp interceptor tests and fixes mem leaks
90497a9 is described below

commit 90497a9037cf9e43e0702779da570a5adbcbbdd2
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Tue Jul 27 20:20:47 2021 +0200

    Enables pubsub tcp interceptor tests and fixes mem leaks
---
 bundles/pubsub/integration/CMakeLists.txt                        | 5 ++---
 bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_handler.c      | 9 ++++++++-
 .../pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c   | 5 +++++
 3 files changed, 15 insertions(+), 4 deletions(-)

diff --git a/bundles/pubsub/integration/CMakeLists.txt b/bundles/pubsub/integration/CMakeLists.txt
index 59207a7..7ff5fe1 100644
--- a/bundles/pubsub/integration/CMakeLists.txt
+++ b/bundles/pubsub/integration/CMakeLists.txt
@@ -838,9 +838,8 @@ if (BUILD_PUBSUB_PSA_WS)
 endif ()
 
 if (BUILD_PUBSUB_PSA_TCP)
-    message(STATUS "TODO enable tcp and interceptors. Currently has a memleak")
-    #add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_tcp_and_wire_v1_integration Celix::celix_pubsub_admin_tcp_v2 Celix::celix_pubsub_protocol_wire_v1)
-    #add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_tcp_and_wire_v2_integration Celix::celix_pubsub_admin_tcp_v2 Celix::celix_pubsub_protocol_wire_v2)
+    add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_tcp_and_wire_v1_integration Celix::celix_pubsub_admin_tcp_v2 Celix::celix_pubsub_protocol_wire_v1)
+    add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_tcp_and_wire_v2_integration Celix::celix_pubsub_admin_tcp_v2 Celix::celix_pubsub_protocol_wire_v2)
 endif ()
 
 if (BUILD_PUBSUB_PSA_ZMQ)
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_handler.c
index 278ebd3..eb02f79 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_handler.c
@@ -855,8 +855,12 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec
     clock_gettime(CLOCK_REALTIME, &receiveTime);
     bool releaseEntryBuffer = false;
     handle->processMessageCallback(handle->processMessagePayload, &entry->header, &releaseEntryBuffer, &receiveTime);
-    if (releaseEntryBuffer) pubsub_tcpHandler_releaseEntryBuffer(handle, entry->fd, 0);
+    if (releaseEntryBuffer) {
+      pubsub_tcpHandler_releaseEntryBuffer(handle, entry->fd, 0);
+    }
   }
+  celix_properties_destroy(entry->header.metadata.metadata);
+  entry->header.metadata.metadata = NULL;
 }
 
 static inline
@@ -1195,6 +1199,9 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                 if (payloadData && (payloadData != message->payload.payload)) {
                     free(payloadData);
                 }
+                if (metadataData && metadataSize > 0) {
+                    free(metadataData);
+                }
             }
             celixThreadMutex_unlock(&entry->writeMutex);
         }
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
index fe6fd53..6945975 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
@@ -487,6 +487,7 @@ static inline void processMsg(void* handle, const pubsub_protocol_message_t *mes
 
         if (status == CELIX_SUCCESS) {
             celix_properties_t *metadata = message->metadata.metadata;
+            bool metadataWasNull = metadata == NULL;
             bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, message->header.msgId, deSerializedMsg, &metadata);
             bool release = true;
             if (cont) {
@@ -498,6 +499,10 @@ static inline void processMsg(void* handle, const pubsub_protocol_message_t *mes
             if (release) {
                 pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deSerializedMsg);
             }
+            if (metadataWasNull) {
+                //note that if the metadata was created by the pubsubInterceptorHandler_invokePreReceive, this needs to be deallocated
+                celix_properties_destroy(metadata);
+            }
         } else {
             L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
                    receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);