You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2020/05/05 12:55:57 UTC
[celix] 01/01: Adds thread protection to the use of zmq socket.
This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch feature/protect_zmq_socket
in repository https://gitbox.apache.org/repos/asf/celix.git
commit c0fd8d05a59b1f8d7258ce8804db9658a7e9796c
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Tue May 5 14:55:31 2020 +0200
Adds thread protection to the use of zmq socket.
---
bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index 1b5abcd..9923a14 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -508,7 +508,7 @@ pubsub_admin_sender_metrics_t* pubsub_zmqTopicSender_metrics(pubsub_zmq_topic_se
}
static void psa_zmq_freeMsg(void *msg, void *hint) {
- if(hint) {
+ if (hint) {
psa_zmq_zerocopy_free_entry *entry = hint;
entry->msgSer->freeSerializeMsg(entry->msgSer->handle, entry->serializedOutput, entry->serializedOutputLen);
free(entry);
@@ -597,6 +597,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
bool sendOk;
if (bound->parent->zeroCopyEnabled) {
+ celixThreadMutex_lock(&sender->zmq.mutex);
zmq_msg_t msg1; // Header
zmq_msg_t msg2; // Payload
zmq_msg_t msg3; // Metadata
@@ -641,16 +642,20 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
zmq_msg_close(&msg3);
}
}
+ celixThreadMutex_unlock(&sender->zmq.mutex);
sendOk = rc > 0;
} else {
+ //no zero copy
zmsg_t *msg = zmsg_new();
zmsg_addmem(msg, headerData, headerLength);
zmsg_addmem(msg, payloadData, payloadLength);
if (metadataLength > 0) {
zmsg_addmem(msg, metadataData, metadataLength);
}
+ celixThreadMutex_lock(&sender->zmq.mutex);
int rc = zmsg_send(&msg, sender->zmq.socket);
+ celixThreadMutex_unlock(&sender->zmq.mutex);
sendOk = rc == 0;
if (!sendOk) {