You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2021/05/27 13:59:25 UTC
[celix] branch feature/use_ser_hander_in_psa updated: Refactors tcp
and zmq v2 pubsub admin for a potential racecondition.
This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch feature/use_ser_hander_in_psa
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/feature/use_ser_hander_in_psa by this push:
new b867a60 Refactors tcp and zmq v2 pubsub admin for a potential racecondition.
b867a60 is described below
commit b867a608636d942b4611d9802b2bc1e77068deb4
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Thu May 27 15:59:11 2021 +0200
Refactors tcp and zmq v2 pubsub admin for a potential racecondition.
Also removed metric support from pubsub zmq v2, to make the send call much simpler.
---
.../v2/src/pubsub_tcp_topic_receiver.h | 2 -
.../v2/src/pubsub_tcp_topic_sender.c | 92 ++---
.../v2/src/pubsub_tcp_topic_sender.h | 5 -
.../v2/src/pubsub_websocket_topic_sender.c | 3 +-
.../pubsub/pubsub_admin_zmq/v2/src/psa_activator.c | 14 -
.../v2/src/pubsub_psa_zmq_constants.h | 4 -
.../pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c | 28 --
.../pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h | 2 -
.../v2/src/pubsub_zmq_topic_receiver.c | 26 --
.../v2/src/pubsub_zmq_topic_receiver.h | 2 -
.../v2/src/pubsub_zmq_topic_sender.c | 448 +++++++--------------
.../v2/src/pubsub_zmq_topic_sender.h | 4 -
12 files changed, 195 insertions(+), 435 deletions(-)
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h
index d06fe4a..35c14c6 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h
@@ -53,6 +53,4 @@ bool pubsub_tcpTopicReceiver_isPassive(pubsub_tcp_topic_receiver_t *sender);
void pubsub_tcpTopicReceiver_connectTo(pubsub_tcp_topic_receiver_t *receiver, const char *url);
void pubsub_tcpTopicReceiver_disconnectFrom(pubsub_tcp_topic_receiver_t *receiver, const char *url);
-pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topic_receiver_t *receiver);
-
#endif //CELIX_PUBSUB_TCP_TOPIC_RECEIVER_H
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
index 47ef05a..2c8daf4 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
@@ -361,8 +361,8 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
pubsub_tcp_topic_sender_t *sender = bound->parent;
const char* msgFqn;
int majorVersion;
- int minorversion;
- celix_status_t status = pubsub_serializerHandler_getMsgInfo(sender->serializerHandler, msgTypeId, &msgFqn, &majorVersion, &minorversion);
+ int minorVersion;
+ celix_status_t status = pubsub_serializerHandler_getMsgInfo(sender->serializerHandler, msgTypeId, &msgFqn, &majorVersion, &minorVersion);
if (status != CELIX_SUCCESS) {
L_WARN("Cannot find serializer for msg id %u for serializer %s", msgTypeId,
@@ -370,59 +370,61 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
return status;
}
- delay_first_send_for_late_joiners(sender);
+ bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, &metadata);
+ if (!cont) {
+ L_DEBUG("Cancel send based on pubsub interceptor cancel return");
+ return status;
+ }
size_t serializedIoVecOutputLen = 0; //entry->serializedIoVecOutputLen;
struct iovec *serializedIoVecOutput = NULL;
status = pubsub_serializerHandler_serialize(sender->serializerHandler, msgTypeId, inMsg, &serializedIoVecOutput, &serializedIoVecOutputLen);
+ if (status != CELIX_SUCCESS) {
+ L_WARN("[PSA_TCP_V2_TS] Error serialize message of type %s for scope/topic %s/%s", msgFqn,
+ sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
+ return status;
+ }
- bool cont = false;
- if (status == CELIX_SUCCESS) /*ser ok*/ {
- cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, &metadata);
+ delay_first_send_for_late_joiners(sender);
+
+ pubsub_protocol_message_t message;
+ message.metadata.metadata = NULL;
+ message.payload.payload = NULL;
+ message.payload.length = 0;
+ if (serializedIoVecOutput) {
+ message.payload.payload = serializedIoVecOutput->iov_base;
+ message.payload.length = serializedIoVecOutput->iov_len;
}
- if (cont) {
- pubsub_protocol_message_t message;
- message.metadata.metadata = NULL;
- message.payload.payload = NULL;
- message.payload.length = 0;
- if (serializedIoVecOutput) {
- message.payload.payload = serializedIoVecOutput->iov_base;
- message.payload.length = serializedIoVecOutput->iov_len;
+ message.header.msgId = msgTypeId;
+ message.header.seqNr = __atomic_fetch_add(&sender->seqNr, 1, __ATOMIC_RELAXED);
+ message.header.msgMajorVersion = (uint16_t)majorVersion;
+ message.header.msgMinorVersion = (uint16_t)minorVersion;
+ message.header.payloadSize = 0;
+ message.header.payloadPartSize = 0;
+ message.header.payloadOffset = 0;
+ message.header.metadataSize = 0;
+ if (metadata != NULL) {
+ message.metadata.metadata = metadata;
+ }
+ bool sendOk = true;
+ {
+ int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput, serializedIoVecOutputLen, 0);
+ if (rc < 0) {
+ status = -1;
+ sendOk = false;
}
- message.header.msgId = msgTypeId;
- message.header.seqNr = __atomic_fetch_add(&sender->seqNr, 1, __ATOMIC_RELAXED);
- message.header.msgMajorVersion = (uint16_t)majorVersion;
- message.header.msgMinorVersion = (uint16_t)minorversion;
- message.header.payloadSize = 0;
- message.header.payloadPartSize = 0;
- message.header.payloadOffset = 0;
- message.header.metadataSize = 0;
- if (metadata != NULL) {
- message.metadata.metadata = metadata;
+ pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, metadata);
+ if (message.metadata.metadata) {
+ celix_properties_destroy(message.metadata.metadata);
}
- bool sendOk = true;
- {
- int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput, serializedIoVecOutputLen, 0);
- if (rc < 0) {
- status = -1;
- sendOk = false;
- }
- pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, metadata);
- if (message.metadata.metadata) {
- celix_properties_destroy(message.metadata.metadata);
- }
- if (serializedIoVecOutput) {
- pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedIoVecOutput, serializedIoVecOutputLen);
- serializedIoVecOutput = NULL;
- }
+ if (serializedIoVecOutput) {
+ pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedIoVecOutput, serializedIoVecOutputLen);
+ serializedIoVecOutput = NULL;
}
+ }
- if (!sendOk) {
- L_WARN("[PSA_TCP_V2_TS] Error sending msg. %s", strerror(errno));
- }
- } else {
- L_WARN("[PSA_TCP_V2_TS] Error serialize message of type %s for scope/topic %s/%s", msgFqn,
- sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
+ if (!sendOk) {
+ L_WARN("[PSA_TCP_V2_TS] Error sending msg. %s", strerror(errno));
}
return status;
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h
index 29c8f7a..57b13a6 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h
@@ -49,9 +49,4 @@ bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender);
bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender);
long pubsub_tcpTopicSender_protocolSvcId(pubsub_tcp_topic_sender_t *sender);
-/**
- * Returns a array of pubsub_admin_sender_msg_type_metrics_t entries for every msg_type/bundle send with the topic sender.
- */
-pubsub_admin_sender_metrics_t *pubsub_tcpTopicSender_metrics(pubsub_tcp_topic_sender_t *sender);
-
#endif //CELIX_PUBSUB_TCP_TOPIC_SENDER_H
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
index 28a8af1..adc5ffe 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
@@ -257,14 +257,13 @@ static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgType
int majorVersion;
int minorVersion;
celix_status_t status = pubsub_serializerHandler_getMsgInfo(sender->serializerHandler, msgTypeId, &msgFqn, &majorVersion, &minorVersion);
-
-
if (status != CELIX_SUCCESS) {
L_WARN("Cannot find serializer for msg id %u for serializer %s", msgTypeId,
pubsub_serializerHandler_getSerializationType(sender->serializerHandler));
return status;
}
+
if (sender->sockConnection != NULL) {
delay_first_send_for_late_joiners(sender);
size_t serializedOutputLen = 0;
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/psa_activator.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/psa_activator.c
index a6d1c2d..7aaee4d 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/psa_activator.c
@@ -41,9 +41,6 @@ typedef struct psa_zmq_activator {
pubsub_admin_service_t adminService;
long adminSvcId;
- pubsub_admin_metrics_service_t adminMetricsService;
- long adminMetricsSvcId;
-
celix_shell_command_t cmdSvc;
long cmdSvcId;
} psa_zmq_activator_t;
@@ -90,16 +87,6 @@ int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
act->adminSvcId = celix_bundleContext_registerService(ctx, psaSvc, PUBSUB_ADMIN_SERVICE_NAME, props);
}
- if (status == CELIX_SUCCESS) {
- act->adminMetricsService.handle = act->admin;
- act->adminMetricsService.metrics = pubsub_zmqAdmin_metrics;
-
- celix_properties_t *props = celix_properties_create();
- celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_ZMQ_ADMIN_TYPE);
-
- act->adminMetricsSvcId = celix_bundleContext_registerService(ctx, &act->adminMetricsService, PUBSUB_ADMIN_METRICS_SERVICE_NAME, props);
- }
-
//register shell command service
{
act->cmdSvc.handle = act->admin;
@@ -117,7 +104,6 @@ int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
int psa_zmq_stop(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
celix_bundleContext_unregisterService(ctx, act->adminSvcId);
celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
- celix_bundleContext_unregisterService(ctx, act->adminMetricsSvcId);
celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
celix_bundleContext_stopTracker(ctx, act->protocolsTrackerId);
pubsub_zmqAdmin_destroy(act->admin);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_psa_zmq_constants.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_psa_zmq_constants.h
index c50006a..7f1d891 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_psa_zmq_constants.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_psa_zmq_constants.h
@@ -35,10 +35,6 @@
#define PSA_ZMQ_QOS_CONTROL_SCORE_KEY "PSA_ZMQ_QOS_CONTROL_SCORE"
#define PSA_ZMQ_DEFAULT_SCORE_KEY "PSA_ZMQ_DEFAULT_SCORE"
-
-#define PSA_ZMQ_METRICS_ENABLED "PSA_ZMQ_METRICS_ENABLED"
-#define PSA_ZMQ_DEFAULT_METRICS_ENABLED true
-
#define PSA_ZMQ_ZEROCOPY_ENABLED "PSA_ZMQ_ZEROCOPY_ENABLED"
#define PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED false
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
index cc6de56..f842f01 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
@@ -751,34 +751,6 @@ bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine, FILE
return status;
}
-pubsub_admin_metrics_t* pubsub_zmqAdmin_metrics(void *handle) {
- pubsub_zmq_admin_t *psa = handle;
- pubsub_admin_metrics_t *result = calloc(1, sizeof(*result));
- snprintf(result->psaType, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", PUBSUB_ZMQ_ADMIN_TYPE);
- result->senders = celix_arrayList_create();
- result->receivers = celix_arrayList_create();
-
- celixThreadMutex_lock(&psa->topicSenders.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
- while (hashMapIterator_hasNext(&iter)) {
- pubsub_zmq_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
- pubsub_admin_sender_metrics_t *metrics = pubsub_zmqTopicSender_metrics(sender);
- celix_arrayList_add(result->senders, metrics);
- }
- celixThreadMutex_unlock(&psa->topicSenders.mutex);
-
- celixThreadMutex_lock(&psa->topicReceivers.mutex);
- iter = hashMapIterator_construct(psa->topicReceivers.map);
- while (hashMapIterator_hasNext(&iter)) {
- pubsub_zmq_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
- pubsub_admin_receiver_metrics_t *metrics = pubsub_zmqTopicReceiver_metrics(receiver);
- celix_arrayList_add(result->receivers, metrics);
- }
- celixThreadMutex_unlock(&psa->topicReceivers.mutex);
-
- return result;
-}
-
static pubsub_serializer_handler_t* pubsub_zmqAdmin_getSerializationHandler(pubsub_zmq_admin_t* psa, long msgSerializationMarkerSvcId) {
pubsub_serializer_handler_t* handler = NULL;
celixThreadMutex_lock(&psa->serializationHandlers.mutex);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h
index 475e464..6a8ba97 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h
@@ -48,7 +48,5 @@ void pubsub_zmqAdmin_removeProtocolSvc(void *handle, void *svc, const celix_prop
bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine, FILE *outStream, FILE *errStream);
-pubsub_admin_metrics_t* pubsub_zmqAdmin_metrics(void *handle);
-
#endif //CELIX_PUBSUB_ZMQ_ADMIN_H
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
index 9d2070f..22cbc7e 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
@@ -70,7 +70,6 @@ struct pubsub_zmq_topic_receiver {
pubsub_protocol_service_t *protocol;
char *scope;
char *topic;
- bool metricsEnabled;
pubsub_interceptors_handler_t *interceptorsHandler;
@@ -138,7 +137,6 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
receiver->protocol = protocol;
receiver->scope = scope == NULL ? NULL : celix_utils_strdup(scope);
receiver->topic = celix_utils_strdup(topic);
- receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler);
@@ -442,15 +440,6 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
}
static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *receiver, psa_zmq_subscriber_entry_t* entry, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
- //NOTE receiver->subscribers.mutex locked
- bool monitor = receiver->metricsEnabled;
-
- //monitoring
- struct timespec beginSer;
- struct timespec endSer;
- int updateReceiveCount = 0;
- int updateSerError = 0;
-
const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
if (msgFqn == NULL) {
L_WARN("Cannot find msg fqn for msg id %u", message->header.msgId);
@@ -460,16 +449,10 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
void *deserializedMsg = NULL;
bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion);
if (validVersion) {
- if (monitor) {
- clock_gettime(CLOCK_REALTIME, &beginSer);
- }
struct iovec deSerializeBuffer;
deSerializeBuffer.iov_base = message->payload.payload;
deSerializeBuffer.iov_len = message->payload.length;
celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 0, &deserializedMsg);
- if (monitor) {
- clock_gettime(CLOCK_REALTIME, &endSer);
- }
if (status == CELIX_SUCCESS) {
uint32_t msgId = message->header.msgId;
celix_properties_t *metadata = message->metadata.metadata;
@@ -495,10 +478,8 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
if (release) {
pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deserializedMsg);
}
- updateReceiveCount += 1;
}
} else {
- updateSerError += 1;
L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
}
} else {
@@ -616,13 +597,6 @@ static void* psa_zmq_recvThread(void * data) {
return NULL;
}
-pubsub_admin_receiver_metrics_t* pubsub_zmqTopicReceiver_metrics(pubsub_zmq_topic_receiver_t *receiver) {
- pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
- snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope);
- snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic);
- return result;
-}
-
static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t *receiver) {
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
index 3900f55..f47ce37 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
@@ -49,7 +49,5 @@ void pubsub_zmqTopicReceiver_connectTo(pubsub_zmq_topic_receiver_t *receiver, co
void pubsub_zmqTopicReceiver_disconnectFrom(pubsub_zmq_topic_receiver_t *receiver, const char *url);
-pubsub_admin_receiver_metrics_t* pubsub_zmqTopicReceiver_metrics(pubsub_zmq_topic_receiver_t *receiver);
-
#endif //CELIX_PUBSUB_ZMQ_TOPIC_RECEIVER_H
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
index afc8e54..7d9e750 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
@@ -57,7 +57,6 @@ struct pubsub_zmq_topic_sender {
long protocolSvcId;
pubsub_protocol_service_t *protocol;
uuid_t fwUUID;
- bool metricsEnabled;
bool zeroCopyEnabled;
pubsub_interceptors_handler_t *interceptorsHandler;
@@ -67,6 +66,18 @@ struct pubsub_zmq_topic_sender {
char *url;
bool isStatic;
+ long seqNr; //atomic
+
+ struct {
+ bool dataLock; //atomic, protects below and protect zmq internal data
+ void *headerBuffer;
+ size_t headerBufferSize;
+ void *metadataBuffer;
+ size_t metadataBufferSize;
+ void *footerBuffer;
+ size_t footerBufferSize;
+ } zmqBuffers;
+
struct {
zsock_t *socket;
zcert_t *cert;
@@ -83,37 +94,10 @@ struct pubsub_zmq_topic_sender {
} boundedServices;
};
-typedef struct psa_zmq_send_msg_entry {
- uint32_t type; //msg type id (hash of fqn)
- const char *fqn;
- uint16_t msgMajorVersion;
- uint16_t msgMinorVersion;
- unsigned char originUUID[16];
- pubsub_protocol_service_t *protSer;
- unsigned int seqNr;
- void *headerBuffer;
- size_t headerBufferSize;
- void *metadataBuffer;
- size_t metadataBufferSize;
- void *footerBuffer;
- size_t footerBufferSize;
- bool dataLocked; // protected ZMQ functions and seqNr
- struct {
- celix_thread_mutex_t mutex; //protects entries in struct
- unsigned long nrOfMessagesSend;
- unsigned long nrOfMessagesSendFailed;
- unsigned long nrOfSerializationErrors;
- struct timespec lastMessageSend;
- double averageTimeBetweenMessagesInSeconds;
- double averageSerializationTimeInSeconds;
- } metrics;
-} psa_zmq_send_msg_entry_t;
-
typedef struct psa_zmq_bounded_service_entry {
pubsub_zmq_topic_sender_t *parent;
pubsub_publisher_t service;
long bndId;
- hash_map_t *msgEntries; //key = msg type id, value = psa_zmq_send_msg_entry_t
int getCount;
} psa_zmq_bounded_service_entry_t;
@@ -156,7 +140,6 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
if (uuid != NULL) {
uuid_parse(uuid, sender->fwUUID);
}
- sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
sender->zeroCopyEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_ZEROCOPY_ENABLED, PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED);
pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
@@ -307,20 +290,6 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
return sender;
}
-static void pubsub_zmqTopicSender_destroyEntry(psa_zmq_send_msg_entry_t *msgEntry) {
- celixThreadMutex_destroy(&msgEntry->metrics.mutex);
- if(msgEntry->headerBuffer != NULL) {
- free(msgEntry->headerBuffer);
- }
- if(msgEntry->metadataBuffer != NULL) {
- free(msgEntry->metadataBuffer);
- }
- if(msgEntry->footerBuffer != NULL) {
- free(msgEntry->footerBuffer);
- }
- free(msgEntry);
-}
-
void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
if (sender != NULL) {
celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId);
@@ -328,21 +297,7 @@ void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
zsock_destroy(&sender->zmq.socket);
celixThreadMutex_lock(&sender->boundedServices.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_zmq_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry != NULL) {
- hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
- while (hashMapIterator_hasNext(&iter2)) {
- psa_zmq_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter2);
- pubsub_zmqTopicSender_destroyEntry(msgEntry);
- }
- hashMap_destroy(entry->msgEntries, false, false);
-
- free(entry);
- }
- }
- hashMap_destroy(sender->boundedServices.map, false, false);
+ hashMap_destroy(sender->boundedServices.map, false, true);
celixThreadMutex_unlock(&sender->boundedServices.mutex);
celixThreadMutex_destroy(&sender->boundedServices.mutex);
@@ -354,6 +309,9 @@ void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
}
free(sender->topic);
free(sender->url);
+ free(sender->zmqBuffers.headerBuffer);
+ free(sender->zmqBuffers.metadataBuffer);
+ free(sender->zmqBuffers.footerBuffer);
free(sender);
}
}
@@ -401,7 +359,6 @@ static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t *req
entry->getCount = 1;
entry->parent = sender;
entry->bndId = bndId;
- entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
entry->service.handle = entry;
entry->service.localMsgTypeIdForMsgType = psa_zmq_localMsgTypeIdForMsgType;
entry->service.send = psa_zmq_topicPublicationSend;
@@ -424,286 +381,175 @@ static void psa_zmq_ungetPublisherService(void *handle, const celix_bundle_t *re
if (entry != NULL && entry->getCount == 0) {
//free entry
hashMap_remove(sender->boundedServices.map, (void*)bndId);
-
- hash_map_iterator_t iter = hashMapIterator_construct(entry->msgEntries);
- while (hashMapIterator_hasNext(&iter)) {
- psa_zmq_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter);
- pubsub_zmqTopicSender_destroyEntry(msgEntry);
- }
- hashMap_destroy(entry->msgEntries, false, false);
free(entry);
}
celixThreadMutex_unlock(&sender->boundedServices.mutex);
}
-pubsub_admin_sender_metrics_t* pubsub_zmqTopicSender_metrics(pubsub_zmq_topic_sender_t *sender) {
- pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result));
- snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : sender->scope);
- snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->topic);
- celixThreadMutex_lock(&sender->boundedServices.mutex);
- size_t count = 0;
- hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_zmq_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
- hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
- while (hashMapIterator_hasNext(&iter2)) {
- hashMapIterator_nextValue(&iter2);
- count += 1;
- }
- }
-
- result->msgMetrics = calloc(count, sizeof(*result));
-
- iter = hashMapIterator_construct(sender->boundedServices.map);
- int i = 0;
- while (hashMapIterator_hasNext(&iter)) {
- psa_zmq_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
- hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
- while (hashMapIterator_hasNext(&iter2)) {
- psa_zmq_send_msg_entry_t *mEntry = hashMapIterator_nextValue(&iter2);
- celixThreadMutex_lock(&mEntry->metrics.mutex);
- result->msgMetrics[i].nrOfMessagesSend = mEntry->metrics.nrOfMessagesSend;
- result->msgMetrics[i].nrOfMessagesSendFailed = mEntry->metrics.nrOfMessagesSendFailed;
- result->msgMetrics[i].nrOfSerializationErrors = mEntry->metrics.nrOfSerializationErrors;
- result->msgMetrics[i].averageSerializationTimeInSeconds = mEntry->metrics.averageSerializationTimeInSeconds;
- result->msgMetrics[i].averageTimeBetweenMessagesInSeconds = mEntry->metrics.averageTimeBetweenMessagesInSeconds;
- result->msgMetrics[i].lastMessageSend = mEntry->metrics.lastMessageSend;
- result->msgMetrics[i].bndId = entry->bndId;
- result->msgMetrics[i].typeId = mEntry->type;
- snprintf(result->msgMetrics[i].typeFqn, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", mEntry->fqn);
- i += 1;
- celixThreadMutex_unlock(&mEntry->metrics.mutex);
- }
- }
-
- celixThreadMutex_unlock(&sender->boundedServices.mutex);
- result->nrOfmsgMetrics = (int)count;
- return result;
-}
-
static void psa_zmq_freeMsg(void *msg, void *hint) {
psa_zmq_zerocopy_free_entry *entry = hint;
pubsub_serializerHandler_freeSerializedMsg(entry->serHandler, entry->msgId, entry->serializedOutput, entry->serializedOutputLen);
free(entry);
}
-static void psa_zmq_unlockData(void *unused __attribute__((unused)), void *hint) {
- psa_zmq_send_msg_entry_t *entry = hint;
- __atomic_store_n(&entry->dataLocked, false, __ATOMIC_RELEASE);
-}
-
static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg, celix_properties_t *metadata) {
- int status = CELIX_SUCCESS;
psa_zmq_bounded_service_entry_t *bound = handle;
pubsub_zmq_topic_sender_t *sender = bound->parent;
- bool monitor = sender->metricsEnabled;
-
- //TODO remove use of entry, so that one less lock is needed and drop metrics stuff
- psa_zmq_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void*)(uintptr_t)(msgTypeId));
-
- //metrics updates
- struct timespec sendTime = { 0, 0 };
- struct timespec serializationStart;
- struct timespec serializationEnd;
- //int unknownMessageCountUpdate = 0;
- int sendErrorUpdate = 0;
- int serializationErrorUpdate = 0;
- int sendCountUpdate = 0;
-
- if (entry == NULL) {
- entry = calloc(1, sizeof(psa_zmq_send_msg_entry_t));
- entry->protSer = sender->protocol;
- entry->type = msgTypeId;
- entry->fqn = pubsub_serializerHandler_getMsgFqn(sender->serializerHandler, msgTypeId);
- entry->msgMajorVersion = pubsub_serializerHandler_getMsgMajorVersion(sender->serializerHandler, msgTypeId);
- entry->msgMinorVersion = pubsub_serializerHandler_getMsgMinorVersion(sender->serializerHandler, msgTypeId);
- uuid_copy(entry->originUUID, sender->fwUUID);
- celixThreadMutex_create(&entry->metrics.mutex, NULL);
- hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry);
- }
-
- delay_first_send_for_late_joiners(sender);
- if (monitor) {
- clock_gettime(CLOCK_REALTIME, &serializationStart);
- }
- size_t serializedOutputLen = 0;
- struct iovec *serializedOutput = NULL;
- status = pubsub_serializerHandler_serialize(sender->serializerHandler, msgTypeId, inMsg, &serializedOutput, &serializedOutputLen);
- if (monitor) {
- clock_gettime(CLOCK_REALTIME, &serializationEnd);
+ const char* msgFqn;
+ int majorVersion;
+ int minorversion;
+ celix_status_t status = pubsub_serializerHandler_getMsgInfo(sender->serializerHandler, msgTypeId, &msgFqn, &majorVersion, &minorversion);
+ if (status != CELIX_SUCCESS) {
+ L_WARN("Cannot find serializer for msg id %u for serializer %s", msgTypeId,
+ pubsub_serializerHandler_getSerializationType(sender->serializerHandler));
+ return status;
}
- if (status == CELIX_SUCCESS /*ser ok*/) {
- // Some ZMQ functions are not thread-safe, but this atomic compare exchange ensures one access at a time.
- bool expected = false;
- while(!__atomic_compare_exchange_n(&entry->dataLocked, &expected, true, false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) {
- expected = false;
- usleep(500);
- }
-
- bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->fqn, msgTypeId, inMsg, &metadata);
- if (cont) {
-
- pubsub_protocol_message_t message;
- message.payload.payload = serializedOutput->iov_base;
- message.payload.length = serializedOutput->iov_len;
-
- void *payloadData = NULL;
- size_t payloadLength = 0;
- entry->protSer->encodePayload(entry->protSer->handle, &message, &payloadData, &payloadLength);
+ bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, &metadata);
+ if (!cont) {
+ L_DEBUG("Cancel send based on pubsub interceptor cancel return");
+ return status;
+ }
- if (metadata != NULL) {
- message.metadata.metadata = metadata;
- entry->protSer->encodeMetadata(entry->protSer->handle, &message, &entry->metadataBuffer, &entry->metadataBufferSize);
- } else {
- message.metadata.metadata = NULL;
- }
+ size_t serializedIoVecOutputLen = 0; //entry->serializedIoVecOutputLen;
+ struct iovec *serializedIoVecOutput = NULL;
+ status = pubsub_serializerHandler_serialize(sender->serializerHandler, msgTypeId, inMsg, &serializedIoVecOutput, &serializedIoVecOutputLen);
- entry->protSer->encodeFooter(entry->protSer->handle, &message, &entry->footerBuffer, &entry->footerBufferSize);
-
- message.header.msgId = msgTypeId;
- message.header.seqNr = entry->seqNr;
- message.header.msgMajorVersion = entry->msgMajorVersion;
- message.header.msgMinorVersion = entry->msgMinorVersion;
- message.header.payloadSize = payloadLength;
- message.header.metadataSize = entry->metadataBufferSize;
- message.header.payloadPartSize = payloadLength;
- message.header.payloadOffset = 0;
- message.header.isLastSegment = 1;
- message.header.convertEndianess = 0;
-
- // increase seqNr
- entry->seqNr++;
-
- entry->protSer->encodeHeader(entry->protSer->handle, &message, &entry->headerBuffer, &entry->headerBufferSize);
-
- errno = 0;
- bool sendOk;
-
- if (bound->parent->zeroCopyEnabled) {
-
- zmq_msg_t msg1; // Header
- zmq_msg_t msg2; // Payload
- zmq_msg_t msg3; // Metadata
- zmq_msg_t msg4; // Footer
- void *socket = zsock_resolve(sender->zmq.socket);
- psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry));
- freeMsgEntry->serHandler = sender->serializerHandler;
- freeMsgEntry->msgId = msgTypeId;
- freeMsgEntry->serializedOutput = serializedOutput;
- freeMsgEntry->serializedOutputLen = serializedOutputLen;
-
- zmq_msg_init_data(&msg1, entry->headerBuffer, entry->headerBufferSize, psa_zmq_unlockData, entry);
- //send header
- int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
- if (rc == -1) {
- L_WARN("Error sending header msg. %s", strerror(errno));
- zmq_msg_close(&msg1);
- }
+ if (status != CELIX_SUCCESS /*serialization not ok*/) {
+ L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", msgFqn, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
+ return status;
+ }
- //send Payload
- if (rc > 0) {
- int flag = ((entry->metadataBufferSize > 0) || (entry->footerBufferSize > 0)) ? ZMQ_SNDMORE : 0;
- zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, freeMsgEntry);
- rc = zmq_msg_send(&msg2, socket, flag);
- if (rc == -1) {
- L_WARN("Error sending payload msg. %s", strerror(errno));
- zmq_msg_close(&msg2);
- }
- }
+ delay_first_send_for_late_joiners(sender);
- //send MetaData
- if (rc > 0 && entry->metadataBufferSize > 0) {
- int flag = (entry->footerBufferSize > 0 ) ? ZMQ_SNDMORE : 0;
- zmq_msg_init_data(&msg3, entry->metadataBuffer, entry->metadataBufferSize, NULL, NULL);
- rc = zmq_msg_send(&msg3, socket, flag);
- if (rc == -1) {
- L_WARN("Error sending metadata msg. %s", strerror(errno));
- zmq_msg_close(&msg3);
- }
- }
+ // Some ZMQ functions are not thread-safe, but this atomic compare exchange ensures one access at a time.
+ // Also protect sender->zmqBuffers (header, meta and footer)
+ bool expected = false;
+ while(!__atomic_compare_exchange_n(&sender->zmqBuffers.dataLock, &expected, true, false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) {
+ expected = false;
+ usleep(5);
+ }
- //send Footer
- if (rc > 0 && entry->footerBufferSize > 0) {
- zmq_msg_init_data(&msg4, entry->footerBuffer, entry->footerBufferSize, NULL, NULL);
- rc = zmq_msg_send(&msg4, socket, 0);
- if (rc == -1) {
- L_WARN("Error sending footer msg. %s", strerror(errno));
- zmq_msg_close(&msg4);
- }
- }
+ pubsub_protocol_message_t message;
+ message.payload.payload = serializedIoVecOutput->iov_base;
+ message.payload.length = serializedIoVecOutput->iov_len;
- sendOk = rc > 0;
- } else {
- //no zero copy
- zmsg_t *msg = zmsg_new();
- zmsg_addmem(msg, entry->headerBuffer, entry->headerBufferSize);
- zmsg_addmem(msg, payloadData, payloadLength);
- if (entry->metadataBufferSize > 0) {
- zmsg_addmem(msg, entry->metadataBuffer, entry->metadataBufferSize);
- }
- if (entry->footerBufferSize > 0) {
- zmsg_addmem(msg, entry->footerBuffer, entry->footerBufferSize);
- }
- int rc = zmsg_send(&msg, sender->zmq.socket);
- sendOk = rc == 0;
+ void *payloadData = NULL;
+ size_t payloadLength = 0;
+ sender->protocol->encodePayload(sender->protocol->handle, &message, &payloadData, &payloadLength);
- if (!sendOk) {
- zmsg_destroy(&msg); //if send was not ok, no owner change -> destroy msg
- }
+ if (metadata != NULL) {
+ message.metadata.metadata = metadata;
+ sender->protocol->encodeMetadata(sender->protocol->handle, &message, &sender->zmqBuffers.metadataBuffer, &sender->zmqBuffers.metadataBufferSize);
+ } else {
+ message.metadata.metadata = NULL;
+ }
- // Note: serialized Payload is deleted by serializer
- if (payloadData && (payloadData != message.payload.payload)) {
- free(payloadData);
- }
+ sender->protocol->encodeFooter(sender->protocol->handle, &message, &sender->zmqBuffers.footerBuffer, &sender->zmqBuffers.footerBufferSize);
+
+ message.header.msgId = msgTypeId;
+ message.header.seqNr = __atomic_fetch_add(&sender->seqNr, 1, __ATOMIC_RELAXED);
+ message.header.msgMajorVersion = majorVersion;
+ message.header.msgMinorVersion = minorversion;
+ message.header.payloadSize = payloadLength;
+ message.header.metadataSize = sender->zmqBuffers.metadataBufferSize;
+ message.header.payloadPartSize = payloadLength;
+ message.header.payloadOffset = 0;
+ message.header.isLastSegment = 1;
+ message.header.convertEndianess = 0;
+
+ sender->protocol->encodeHeader(sender->protocol->handle, &message, &sender->zmqBuffers.headerBuffer, &sender->zmqBuffers.headerBufferSize);
+
+ errno = 0;
+ bool sendOk;
+ if (bound->parent->zeroCopyEnabled) {
+ zmq_msg_t msg1; // Header
+ zmq_msg_t msg2; // Payload
+ zmq_msg_t msg3; // Metadata
+ zmq_msg_t msg4; // Footer
+ void *socket = zsock_resolve(sender->zmq.socket);
+ psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry)); //NOTE should be improved. Not really zero copy
+ freeMsgEntry->serHandler = sender->serializerHandler;
+ freeMsgEntry->msgId = msgTypeId;
+ freeMsgEntry->serializedOutput = serializedIoVecOutput;
+ freeMsgEntry->serializedOutputLen = serializedIoVecOutputLen;
+
+ zmq_msg_init_data(&msg1, sender->zmqBuffers.headerBuffer, sender->zmqBuffers.headerBufferSize, NULL, NULL);
+ //send header
+ int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
+ if (rc == -1) {
+ L_WARN("Error sending header msg. %s", strerror(errno));
+ zmq_msg_close(&msg1);
+ }
- __atomic_store_n(&entry->dataLocked, false, __ATOMIC_RELEASE);
+ //send Payload
+ if (rc > 0) {
+ int flag = ((sender->zmqBuffers.metadataBufferSize > 0) || (sender->zmqBuffers.footerBufferSize > 0)) ? ZMQ_SNDMORE : 0;
+ zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, freeMsgEntry);
+ rc = zmq_msg_send(&msg2, socket, flag);
+ if (rc == -1) {
+ L_WARN("Error sending payload msg. %s", strerror(errno));
+ zmq_msg_close(&msg2);
}
- pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->fqn, msgTypeId, inMsg, metadata);
+ }
- if (message.metadata.metadata) {
- celix_properties_destroy(message.metadata.metadata);
- }
- if (!bound->parent->zeroCopyEnabled && serializedOutput) {
- pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedOutput, serializedOutputLen);
+ //send MetaData
+ if (rc > 0 && sender->zmqBuffers.metadataBufferSize > 0) {
+ int flag = (sender->zmqBuffers.footerBufferSize > 0 ) ? ZMQ_SNDMORE : 0;
+ zmq_msg_init_data(&msg3, sender->zmqBuffers.metadataBuffer, sender->zmqBuffers.metadataBufferSize, NULL, NULL);
+ rc = zmq_msg_send(&msg3, socket, flag);
+ if (rc == -1) {
+ L_WARN("Error sending metadata msg. %s", strerror(errno));
+ zmq_msg_close(&msg3);
}
+ }
- if (sendOk) {
- sendCountUpdate = 1;
- } else {
- sendErrorUpdate = 1;
- L_WARN("[PSA_ZMQ_TS] Error sending zmg. %s", strerror(errno));
+ //send Footer
+ if (rc > 0 && sender->zmqBuffers.footerBufferSize > 0) {
+ zmq_msg_init_data(&msg4, sender->zmqBuffers.footerBuffer, sender->zmqBuffers.footerBufferSize, NULL, NULL);
+ rc = zmq_msg_send(&msg4, socket, 0);
+ if (rc == -1) {
+ L_WARN("Error sending footer msg. %s", strerror(errno));
+ zmq_msg_close(&msg4);
}
- } else {
- L_WARN("no continue");
}
+ sendOk = rc > 0;
} else {
- serializationErrorUpdate = 1;
- L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", entry->fqn, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
- }
-
- if (monitor) {
- celixThreadMutex_lock(&entry->metrics.mutex);
+ //no zero copy
+ zmsg_t *msg = zmsg_new();
+ zmsg_addmem(msg, sender->zmqBuffers.headerBuffer, sender->zmqBuffers.headerBufferSize);
+ zmsg_addmem(msg, payloadData, payloadLength);
+ if (sender->zmqBuffers.metadataBufferSize > 0) {
+ zmsg_addmem(msg, sender->zmqBuffers.metadataBuffer, sender->zmqBuffers.metadataBufferSize);
+ }
+ if (sender->zmqBuffers.footerBufferSize > 0) {
+ zmsg_addmem(msg, sender->zmqBuffers.footerBuffer, sender->zmqBuffers.footerBufferSize);
+ }
+ int rc = zmsg_send(&msg, sender->zmq.socket);
+ sendOk = rc == 0;
- double n = (double)(entry->metrics.nrOfMessagesSend + entry->metrics.nrOfMessagesSendFailed);
- double diff = celix_difftime(&serializationStart, &serializationEnd);
- double average = (entry->metrics.averageSerializationTimeInSeconds * n + diff) / (n+1);
- entry->metrics.averageSerializationTimeInSeconds = average;
+ if (!sendOk) {
+ zmsg_destroy(&msg); //if send was not ok, no owner change -> destroy msg
+ }
- if (entry->metrics.nrOfMessagesSend > 2) {
- diff = celix_difftime(&entry->metrics.lastMessageSend, &sendTime);
- n = entry->metrics.nrOfMessagesSend;
- average = (entry->metrics.averageTimeBetweenMessagesInSeconds * n + diff) / (n+1);
- entry->metrics.averageTimeBetweenMessagesInSeconds = average;
+ // Note: serialized Payload is deleted by serializer
+ if (payloadData && (payloadData != message.payload.payload)) {
+ free(payloadData);
}
+ }
+ __atomic_store_n(&sender->zmqBuffers.dataLock, false, __ATOMIC_RELEASE);
+ pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, metadata);
- entry->metrics.lastMessageSend = sendTime;
- entry->metrics.nrOfMessagesSend += sendCountUpdate;
- entry->metrics.nrOfMessagesSendFailed += sendErrorUpdate;
- entry->metrics.nrOfSerializationErrors += serializationErrorUpdate;
+ if (message.metadata.metadata) {
+ celix_properties_destroy(message.metadata.metadata);
+ }
+ if (!bound->parent->zeroCopyEnabled && serializedIoVecOutput) {
+ pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedIoVecOutput, serializedIoVecOutputLen);
+ }
- celixThreadMutex_unlock(&entry->metrics.mutex);
+ if (!sendOk) {
+ L_WARN("[PSA_ZMQ_TS] Error sending zmg. %s", strerror(errno));
}
return status;
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
index 584b88d..bb49a2a 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
@@ -51,9 +51,5 @@ bool pubsub_zmqTopicSender_isStatic(pubsub_zmq_topic_sender_t *sender);
const char* pubsub_zmqTopicSender_serializerType(pubsub_zmq_topic_sender_t *sender);
long pubsub_zmqTopicSender_protocolSvcId(pubsub_zmq_topic_sender_t *sender);
-/**
- * Returns a array of pubsub_admin_sender_msg_type_metrics_t entries for every msg_type/bundle send with the topic sender.
- */
-pubsub_admin_sender_metrics_t* pubsub_zmqTopicSender_metrics(pubsub_zmq_topic_sender_t *sender);
#endif //CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H