You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2020/05/05 12:55:57 UTC

[celix] 01/01: Adds thread protection to the use of zmq socket.

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/protect_zmq_socket
in repository https://gitbox.apache.org/repos/asf/celix.git

commit c0fd8d05a59b1f8d7258ce8804db9658a7e9796c
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Tue May 5 14:55:31 2020 +0200

    Adds thread protection to the use of zmq socket.
---
 bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index 1b5abcd..9923a14 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -508,7 +508,7 @@ pubsub_admin_sender_metrics_t* pubsub_zmqTopicSender_metrics(pubsub_zmq_topic_se
 }
 
 static void psa_zmq_freeMsg(void *msg, void *hint) {
-    if(hint) {
+    if (hint) {
         psa_zmq_zerocopy_free_entry *entry = hint;
         entry->msgSer->freeSerializeMsg(entry->msgSer->handle, entry->serializedOutput, entry->serializedOutputLen);
         free(entry);
@@ -597,6 +597,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
                 bool sendOk;
 
                 if (bound->parent->zeroCopyEnabled) {
+                    celixThreadMutex_lock(&sender->zmq.mutex);
                     zmq_msg_t msg1; // Header
                     zmq_msg_t msg2; // Payload
                     zmq_msg_t msg3; // Metadata
@@ -641,16 +642,20 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
                             zmq_msg_close(&msg3);
                         }
                     }
+                    celixThreadMutex_unlock(&sender->zmq.mutex);
 
                     sendOk = rc > 0;
                 } else {
+                    //no zero copy
                     zmsg_t *msg = zmsg_new();
                     zmsg_addmem(msg, headerData, headerLength);
                     zmsg_addmem(msg, payloadData, payloadLength);
                     if (metadataLength > 0) {
                         zmsg_addmem(msg, metadataData, metadataLength);
                     }
+                    celixThreadMutex_lock(&sender->zmq.mutex);
                     int rc = zmsg_send(&msg, sender->zmq.socket);
+                    celixThreadMutex_unlock(&sender->zmq.mutex);
                     sendOk = rc == 0;
 
                     if (!sendOk) {