You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2019/02/04 19:48:23 UTC
[celix] branch feature/CELIX-459-pubsub-hums updated: CELIX-459:
Replaces atomic with mutex and makes to use of zerocopy in zmq (for now)
configurable.
This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch feature/CELIX-459-pubsub-hums
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/feature/CELIX-459-pubsub-hums by this push:
new d65c468 CELIX-459: Replaces atomic with mutex and makes to use of zerocopy in zmq (for now) configurable.
d65c468 is described below
commit d65c468231f036c5ebbd379a96de5d57c651aa9a
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Mon Feb 4 20:47:49 2019 +0100
CELIX-459: Replaces atomic with mutex and makes to use of zerocopy in zmq (for now) configurable.
---
.../src/pubsub_psa_zmq_constants.h | 3 +
.../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 71 +++++++++++-----------
2 files changed, 38 insertions(+), 36 deletions(-)
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
index a5ebbe2..ade3551 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
@@ -41,6 +41,9 @@
#define PSA_ZMQ_METRICS_ENABLED "PSA_ZMQ_METRICS_ENABLED"
#define PSA_ZMQ_DEFAULT_METRICS_ENABLED true
+#define PSA_ZMQ_ZEROCOPY_ENABLED "PSA_ZMQ_ZEROCOPY_ENABLED"
+#define PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED false
+
#define PUBSUB_ZMQ_VERBOSE_KEY "PSA_ZMQ_VERBOSE"
#define PUBSUB_ZMQ_VERBOSE_DEFAULT true
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index 1320114..ef9a8a8 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -32,7 +32,6 @@
#include "pubsub_zmq_common.h"
#include <uuid/uuid.h>
#include <constants.h>
-#include <stdatomic.h>
#define FIRST_SEND_DELAY_IN_SECONDS 2
#define ZMQ_BIND_MAX_RETRY 10
@@ -53,6 +52,7 @@ struct pubsub_zmq_topic_sender {
pubsub_serializer_service_t *serializer;
uuid_t fwUUID;
bool metricsEnabled;
+ bool zeroCopyEnabled; //TODO tmp, when zero copy is stable remove option
char *scope;
char *topic;
@@ -80,7 +80,7 @@ struct pubsub_zmq_topic_sender {
typedef struct psa_zmq_send_msg_entry {
pubsub_zmq_msg_header_t header; //partially filled header (only seqnr and time needs to be updated per send)
pubsub_msg_serializer_t *msgSer;
- _Atomic(int) sendLock; //protects send & Seqnr
+ celix_thread_mutex_t sendLock; //protects send & Seqnr
int seqNr;
struct {
celix_thread_mutex_t mutex; //protects entries in struct
@@ -131,6 +131,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
uuid_parse(uuid, sender->fwUUID);
}
sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
+ sender->zeroCopyEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_ZEROCOPY_ENABLED, PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED);
//setting up zmq socket for ZMQ TopicSender
{
@@ -507,6 +508,11 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
}
if (status == CELIX_SUCCESS /*ser ok*/) {
+ unsigned char *hdr = calloc(sizeof(pubsub_zmq_msg_header_t), sizeof(unsigned char));
+
+ //TODO refactor, is the mutex really needed?
+ celixThreadMutex_lock(&entry->sendLock);
+
pubsub_zmq_msg_header_t msg_hdr = entry->header;
msg_hdr.seqNr = -1;
msg_hdr.sendtimeSeconds = 0;
@@ -515,53 +521,46 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
clock_gettime(CLOCK_REALTIME, &sendTime);
msg_hdr.sendtimeSeconds = (int64_t) sendTime.tv_sec;
msg_hdr.sendTimeNanoseconds = (int64_t) sendTime.tv_nsec;
- int expected = 0;
- while (!atomic_compare_exchange_weak(&entry->sendLock, &expected, 1)) {
- expected = 0; //if the CAS fails, the expected will be set to 1, so we need to change it to 0 again.
- }
msg_hdr.seqNr = entry->seqNr++;
}
- unsigned char *hdr = calloc(sizeof(pubsub_zmq_msg_header_t), sizeof(unsigned char));
psa_zmq_encodeHeader(&msg_hdr, hdr, sizeof(pubsub_zmq_msg_header_t));
errno = 0;
bool sendOk;
-// zmsg_t *msg = zmsg_new();
-// zmsg_addstr(msg, sender->scopeAndTopicFilter);
-// zmsg_addmem(msg, hdr, sizeof(pubsub_zmq_msg_header_t));
-// zmsg_addmem(msg, serializedOutput, serializedOutputLen);
-// int rc = zmsg_send(&msg, sender->zmq.socket);
-// sendOk = rc == 0;
-// free(serializedOutput);
-// free(hdr);
-
- zmq_msg_t msg1; //filter
- zmq_msg_t msg2; //header
- zmq_msg_t msg3; //payload
- zmq_msg_init_data(&msg1, sender->scopeAndTopicFilter, 4, NULL, bound);
- zmq_msg_init_data(&msg2, hdr, sizeof(pubsub_zmq_msg_header_t), psa_zmq_freeMsg, bound);
- zmq_msg_init_data(&msg3, serializedOutput, serializedOutputLen, psa_zmq_freeMsg, bound);
- void *socket = zsock_resolve(sender->zmq.socket);
- int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
- if (rc > 0) {
- rc = zmq_msg_send(&msg2, socket, ZMQ_SNDMORE);
- }
- if (rc > 0) {
- rc = zmq_msg_send(&msg3, socket, 0);
+ if (bound->parent->zeroCopyEnabled) {
+ zmq_msg_t msg1; //filter
+ zmq_msg_t msg2; //header
+ zmq_msg_t msg3; //payload
+ zmq_msg_init_data(&msg1, sender->scopeAndTopicFilter, 4, NULL, bound);
+ zmq_msg_init_data(&msg2, hdr, sizeof(pubsub_zmq_msg_header_t), psa_zmq_freeMsg, bound);
+ zmq_msg_init_data(&msg3, serializedOutput, serializedOutputLen, psa_zmq_freeMsg, bound);
+ void *socket = zsock_resolve(sender->zmq.socket);
+ int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
+ if (rc > 0) {
+ rc = zmq_msg_send(&msg2, socket, ZMQ_SNDMORE);
+ }
+ if (rc > 0) {
+ rc = zmq_msg_send(&msg3, socket, 0);
+ }
+ sendOk = rc > 0;
+ } else {
+ zmsg_t *msg = zmsg_new();
+ zmsg_addstr(msg, sender->scopeAndTopicFilter);
+ zmsg_addmem(msg, hdr, sizeof(pubsub_zmq_msg_header_t));
+ zmsg_addmem(msg, serializedOutput, serializedOutputLen);
+ int rc = zmsg_send(&msg, sender->zmq.socket);
+ sendOk = rc == 0;
+ free(serializedOutput);
+ free(hdr);
}
- sendOk = rc > 0;
-
- if (monitor) {
- //unlock send
- entry->sendLock = 0;
- }
+ celixThreadMutex_unlock(&entry->sendLock);
if (sendOk) {
sendCountUpdate = 1;
} else {
sendErrorUpdate = 1;
- L_WARN("[PSA_ZMQ_TS] Error sending zmg, rc is %i. %s", rc, strerror(errno));
+ L_WARN("[PSA_ZMQ_TS] Error sending zmg. %s", strerror(errno));
}
} else {
serializationErrorUpdate = 1;