You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2019/02/04 19:48:23 UTC

[celix] branch feature/CELIX-459-pubsub-hums updated: CELIX-459: Replaces atomic with mutex and makes to use of zerocopy in zmq (for now) configurable.

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/CELIX-459-pubsub-hums
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/feature/CELIX-459-pubsub-hums by this push:
     new d65c468  CELIX-459: Replaces atomic with mutex and makes to use of zerocopy in zmq (for now) configurable.
d65c468 is described below

commit d65c468231f036c5ebbd379a96de5d57c651aa9a
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Mon Feb 4 20:47:49 2019 +0100

    CELIX-459: Replaces atomic with mutex and makes to use of zerocopy in zmq (for now) configurable.
---
 .../src/pubsub_psa_zmq_constants.h                 |  3 +
 .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 71 +++++++++++-----------
 2 files changed, 38 insertions(+), 36 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
index a5ebbe2..ade3551 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
@@ -41,6 +41,9 @@
 #define PSA_ZMQ_METRICS_ENABLED "PSA_ZMQ_METRICS_ENABLED"
 #define PSA_ZMQ_DEFAULT_METRICS_ENABLED true
 
+#define PSA_ZMQ_ZEROCOPY_ENABLED "PSA_ZMQ_ZEROCOPY_ENABLED"
+#define PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED false
+
 
 #define PUBSUB_ZMQ_VERBOSE_KEY      "PSA_ZMQ_VERBOSE"
 #define PUBSUB_ZMQ_VERBOSE_DEFAULT  true
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index 1320114..ef9a8a8 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -32,7 +32,6 @@
 #include "pubsub_zmq_common.h"
 #include <uuid/uuid.h>
 #include <constants.h>
-#include <stdatomic.h>
 
 #define FIRST_SEND_DELAY_IN_SECONDS             2
 #define ZMQ_BIND_MAX_RETRY                      10
@@ -53,6 +52,7 @@ struct pubsub_zmq_topic_sender {
     pubsub_serializer_service_t *serializer;
     uuid_t fwUUID;
     bool metricsEnabled;
+    bool zeroCopyEnabled; //TODO tmp, when zero copy is stable remove option
 
     char *scope;
     char *topic;
@@ -80,7 +80,7 @@ struct pubsub_zmq_topic_sender {
 typedef struct psa_zmq_send_msg_entry {
     pubsub_zmq_msg_header_t header; //partially filled header (only seqnr and time needs to be updated per send)
     pubsub_msg_serializer_t *msgSer;
-    _Atomic(int) sendLock; //protects send & Seqnr
+    celix_thread_mutex_t sendLock; //protects send & Seqnr
     int seqNr;
     struct {
         celix_thread_mutex_t mutex; //protects entries in struct
@@ -131,6 +131,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
         uuid_parse(uuid, sender->fwUUID);
     }
     sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
+    sender->zeroCopyEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_ZEROCOPY_ENABLED, PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED);
 
     //setting up zmq socket for ZMQ TopicSender
     {
@@ -507,6 +508,11 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
         }
 
         if (status == CELIX_SUCCESS /*ser ok*/) {
+            unsigned char *hdr = calloc(sizeof(pubsub_zmq_msg_header_t), sizeof(unsigned char));
+
+            //TODO refactor, is the mutex really needed?
+            celixThreadMutex_lock(&entry->sendLock);
+
             pubsub_zmq_msg_header_t msg_hdr = entry->header;
             msg_hdr.seqNr = -1;
             msg_hdr.sendtimeSeconds = 0;
@@ -515,53 +521,46 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
                 clock_gettime(CLOCK_REALTIME, &sendTime);
                 msg_hdr.sendtimeSeconds = (int64_t) sendTime.tv_sec;
                 msg_hdr.sendTimeNanoseconds = (int64_t) sendTime.tv_nsec;
-                int expected = 0;
-                while (!atomic_compare_exchange_weak(&entry->sendLock, &expected, 1)) {
-                    expected = 0; //if the CAS fails, the expected will be set to 1, so we need to change it to 0 again.
-                }
                 msg_hdr.seqNr = entry->seqNr++;
             }
-            unsigned char *hdr = calloc(sizeof(pubsub_zmq_msg_header_t), sizeof(unsigned char));
             psa_zmq_encodeHeader(&msg_hdr, hdr, sizeof(pubsub_zmq_msg_header_t));
 
             errno = 0;
             bool sendOk;
 
-//            zmsg_t *msg = zmsg_new();
-//            zmsg_addstr(msg, sender->scopeAndTopicFilter);
-//            zmsg_addmem(msg, hdr, sizeof(pubsub_zmq_msg_header_t));
-//            zmsg_addmem(msg, serializedOutput, serializedOutputLen);
-//            int rc = zmsg_send(&msg, sender->zmq.socket);
-//            sendOk = rc == 0;
-//            free(serializedOutput);
-//            free(hdr);
-
-            zmq_msg_t msg1; //filter
-            zmq_msg_t msg2; //header
-            zmq_msg_t msg3; //payload
-            zmq_msg_init_data(&msg1, sender->scopeAndTopicFilter, 4, NULL, bound);
-            zmq_msg_init_data(&msg2, hdr, sizeof(pubsub_zmq_msg_header_t), psa_zmq_freeMsg, bound);
-            zmq_msg_init_data(&msg3, serializedOutput, serializedOutputLen, psa_zmq_freeMsg, bound);
-            void *socket = zsock_resolve(sender->zmq.socket);
-            int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
-            if (rc > 0) {
-                rc = zmq_msg_send(&msg2, socket, ZMQ_SNDMORE);
-            }
-            if (rc > 0) {
-                rc = zmq_msg_send(&msg3, socket, 0);
+            if (bound->parent->zeroCopyEnabled) {
+                zmq_msg_t msg1; //filter
+                zmq_msg_t msg2; //header
+                zmq_msg_t msg3; //payload
+                zmq_msg_init_data(&msg1, sender->scopeAndTopicFilter, 4, NULL, bound);
+                zmq_msg_init_data(&msg2, hdr, sizeof(pubsub_zmq_msg_header_t), psa_zmq_freeMsg, bound);
+                zmq_msg_init_data(&msg3, serializedOutput, serializedOutputLen, psa_zmq_freeMsg, bound);
+                void *socket = zsock_resolve(sender->zmq.socket);
+                int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
+                if (rc > 0) {
+                    rc = zmq_msg_send(&msg2, socket, ZMQ_SNDMORE);
+                }
+                if (rc > 0) {
+                    rc = zmq_msg_send(&msg3, socket, 0);
+                }
+                sendOk = rc > 0;
+            } else {
+                zmsg_t *msg = zmsg_new();
+                zmsg_addstr(msg, sender->scopeAndTopicFilter);
+                zmsg_addmem(msg, hdr, sizeof(pubsub_zmq_msg_header_t));
+                zmsg_addmem(msg, serializedOutput, serializedOutputLen);
+                int rc = zmsg_send(&msg, sender->zmq.socket);
+                sendOk = rc == 0;
+                free(serializedOutput);
+                free(hdr);
             }
-            sendOk = rc > 0;
 
-
-            if (monitor) {
-                //unlock send
-                entry->sendLock = 0;
-            }
+            celixThreadMutex_unlock(&entry->sendLock);
             if (sendOk) {
                 sendCountUpdate = 1;
             } else {
                 sendErrorUpdate = 1;
-                L_WARN("[PSA_ZMQ_TS] Error sending zmg, rc is %i. %s", rc, strerror(errno));
+                L_WARN("[PSA_ZMQ_TS] Error sending zmg. %s", strerror(errno));
             }
         } else {
             serializationErrorUpdate = 1;