You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2021/07/27 18:21:02 UTC
[celix] branch feature/pubsub-interceptor-fix updated: Enables
pubsub tcp interceptor tests and fixes mem leaks
This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/feature/pubsub-interceptor-fix by this push:
new 90497a9 Enables pubsub tcp interceptor tests and fixes mem leaks
90497a9 is described below
commit 90497a9037cf9e43e0702779da570a5adbcbbdd2
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Tue Jul 27 20:20:47 2021 +0200
Enables pubsub tcp interceptor tests and fixes mem leaks
---
bundles/pubsub/integration/CMakeLists.txt | 5 ++---
bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_handler.c | 9 ++++++++-
.../pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c | 5 +++++
3 files changed, 15 insertions(+), 4 deletions(-)
diff --git a/bundles/pubsub/integration/CMakeLists.txt b/bundles/pubsub/integration/CMakeLists.txt
index 59207a7..7ff5fe1 100644
--- a/bundles/pubsub/integration/CMakeLists.txt
+++ b/bundles/pubsub/integration/CMakeLists.txt
@@ -838,9 +838,8 @@ if (BUILD_PUBSUB_PSA_WS)
endif ()
if (BUILD_PUBSUB_PSA_TCP)
- message(STATUS "TODO enable tcp and interceptors. Currently has a memleak")
- #add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_tcp_and_wire_v1_integration Celix::celix_pubsub_admin_tcp_v2 Celix::celix_pubsub_protocol_wire_v1)
- #add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_tcp_and_wire_v2_integration Celix::celix_pubsub_admin_tcp_v2 Celix::celix_pubsub_protocol_wire_v2)
+ add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_tcp_and_wire_v1_integration Celix::celix_pubsub_admin_tcp_v2 Celix::celix_pubsub_protocol_wire_v1)
+ add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_tcp_and_wire_v2_integration Celix::celix_pubsub_admin_tcp_v2 Celix::celix_pubsub_protocol_wire_v2)
endif ()
if (BUILD_PUBSUB_PSA_ZMQ)
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_handler.c
index 278ebd3..eb02f79 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_handler.c
@@ -855,8 +855,12 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec
clock_gettime(CLOCK_REALTIME, &receiveTime);
bool releaseEntryBuffer = false;
handle->processMessageCallback(handle->processMessagePayload, &entry->header, &releaseEntryBuffer, &receiveTime);
- if (releaseEntryBuffer) pubsub_tcpHandler_releaseEntryBuffer(handle, entry->fd, 0);
+ if (releaseEntryBuffer) {
+ pubsub_tcpHandler_releaseEntryBuffer(handle, entry->fd, 0);
+ }
}
+ celix_properties_destroy(entry->header.metadata.metadata);
+ entry->header.metadata.metadata = NULL;
}
static inline
@@ -1195,6 +1199,9 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
if (payloadData && (payloadData != message->payload.payload)) {
free(payloadData);
}
+ if (metadataData && metadataSize > 0) {
+ free(metadataData);
+ }
}
celixThreadMutex_unlock(&entry->writeMutex);
}
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
index fe6fd53..6945975 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
@@ -487,6 +487,7 @@ static inline void processMsg(void* handle, const pubsub_protocol_message_t *mes
if (status == CELIX_SUCCESS) {
celix_properties_t *metadata = message->metadata.metadata;
+ bool metadataWasNull = metadata == NULL;
bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, message->header.msgId, deSerializedMsg, &metadata);
bool release = true;
if (cont) {
@@ -498,6 +499,10 @@ static inline void processMsg(void* handle, const pubsub_protocol_message_t *mes
if (release) {
pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deSerializedMsg);
}
+ if (metadataWasNull) {
+ //note that if the metadata was created by the pubsubInterceptorHandler_invokePreReceive, this needs to be deallocated
+ celix_properties_destroy(metadata);
+ }
} else {
L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);