You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2021/05/27 13:59:25 UTC

[celix] branch feature/use_ser_hander_in_psa updated: Refactors tcp and zmq v2 pubsub admin for a potential racecondition.

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/use_ser_hander_in_psa
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/feature/use_ser_hander_in_psa by this push:
     new b867a60  Refactors tcp and zmq v2 pubsub admin for a potential racecondition.
b867a60 is described below

commit b867a608636d942b4611d9802b2bc1e77068deb4
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Thu May 27 15:59:11 2021 +0200

    Refactors tcp and zmq v2 pubsub admin for a potential racecondition.
    
    Also removed metric support from pubsub zmq v2, to make the send call much simpler.
---
 .../v2/src/pubsub_tcp_topic_receiver.h             |   2 -
 .../v2/src/pubsub_tcp_topic_sender.c               |  92 ++---
 .../v2/src/pubsub_tcp_topic_sender.h               |   5 -
 .../v2/src/pubsub_websocket_topic_sender.c         |   3 +-
 .../pubsub/pubsub_admin_zmq/v2/src/psa_activator.c |  14 -
 .../v2/src/pubsub_psa_zmq_constants.h              |   4 -
 .../pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c     |  28 --
 .../pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h     |   2 -
 .../v2/src/pubsub_zmq_topic_receiver.c             |  26 --
 .../v2/src/pubsub_zmq_topic_receiver.h             |   2 -
 .../v2/src/pubsub_zmq_topic_sender.c               | 448 +++++++--------------
 .../v2/src/pubsub_zmq_topic_sender.h               |   4 -
 12 files changed, 195 insertions(+), 435 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h
index d06fe4a..35c14c6 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h
@@ -53,6 +53,4 @@ bool pubsub_tcpTopicReceiver_isPassive(pubsub_tcp_topic_receiver_t *sender);
 void pubsub_tcpTopicReceiver_connectTo(pubsub_tcp_topic_receiver_t *receiver, const char *url);
 void pubsub_tcpTopicReceiver_disconnectFrom(pubsub_tcp_topic_receiver_t *receiver, const char *url);
 
-pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topic_receiver_t *receiver);
-
 #endif //CELIX_PUBSUB_TCP_TOPIC_RECEIVER_H
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
index 47ef05a..2c8daf4 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
@@ -361,8 +361,8 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
     pubsub_tcp_topic_sender_t *sender = bound->parent;
     const char* msgFqn;
     int majorVersion;
-    int minorversion;
-    celix_status_t status = pubsub_serializerHandler_getMsgInfo(sender->serializerHandler, msgTypeId, &msgFqn, &majorVersion, &minorversion);
+    int minorVersion;
+    celix_status_t status = pubsub_serializerHandler_getMsgInfo(sender->serializerHandler, msgTypeId, &msgFqn, &majorVersion, &minorVersion);
 
     if (status != CELIX_SUCCESS) {
         L_WARN("Cannot find serializer for msg id %u for serializer %s", msgTypeId,
@@ -370,59 +370,61 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
         return status;
     }
 
-    delay_first_send_for_late_joiners(sender);
+    bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, &metadata);
+    if (!cont) {
+        L_DEBUG("Cancel send based on pubsub interceptor cancel return");
+        return status;
+    }
 
     size_t serializedIoVecOutputLen = 0; //entry->serializedIoVecOutputLen;
     struct iovec *serializedIoVecOutput = NULL;
     status = pubsub_serializerHandler_serialize(sender->serializerHandler, msgTypeId, inMsg, &serializedIoVecOutput, &serializedIoVecOutputLen);
+    if (status != CELIX_SUCCESS) {
+        L_WARN("[PSA_TCP_V2_TS] Error serialize message of type %s for scope/topic %s/%s", msgFqn,
+               sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
+        return status;
+    }
 
-    bool cont = false;
-    if (status == CELIX_SUCCESS) /*ser ok*/ {
-        cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, &metadata);
+    delay_first_send_for_late_joiners(sender);
+
+    pubsub_protocol_message_t message;
+    message.metadata.metadata = NULL;
+    message.payload.payload = NULL;
+    message.payload.length = 0;
+    if (serializedIoVecOutput) {
+        message.payload.payload = serializedIoVecOutput->iov_base;
+        message.payload.length = serializedIoVecOutput->iov_len;
     }
-    if (cont) {
-        pubsub_protocol_message_t message;
-        message.metadata.metadata = NULL;
-        message.payload.payload = NULL;
-        message.payload.length = 0;
-        if (serializedIoVecOutput) {
-            message.payload.payload = serializedIoVecOutput->iov_base;
-            message.payload.length = serializedIoVecOutput->iov_len;
+    message.header.msgId = msgTypeId;
+    message.header.seqNr = __atomic_fetch_add(&sender->seqNr, 1, __ATOMIC_RELAXED);
+    message.header.msgMajorVersion = (uint16_t)majorVersion;
+    message.header.msgMinorVersion = (uint16_t)minorVersion;
+    message.header.payloadSize = 0;
+    message.header.payloadPartSize = 0;
+    message.header.payloadOffset = 0;
+    message.header.metadataSize = 0;
+    if (metadata != NULL) {
+        message.metadata.metadata = metadata;
+    }
+    bool sendOk = true;
+    {
+        int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput, serializedIoVecOutputLen, 0);
+        if (rc < 0) {
+            status = -1;
+            sendOk = false;
         }
-        message.header.msgId = msgTypeId;
-        message.header.seqNr = __atomic_fetch_add(&sender->seqNr, 1, __ATOMIC_RELAXED);
-        message.header.msgMajorVersion = (uint16_t)majorVersion;
-        message.header.msgMinorVersion = (uint16_t)minorversion;
-        message.header.payloadSize = 0;
-        message.header.payloadPartSize = 0;
-        message.header.payloadOffset = 0;
-        message.header.metadataSize = 0;
-        if (metadata != NULL) {
-            message.metadata.metadata = metadata;
+        pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, metadata);
+        if (message.metadata.metadata) {
+            celix_properties_destroy(message.metadata.metadata);
         }
-        bool sendOk = true;
-        {
-            int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput, serializedIoVecOutputLen, 0);
-            if (rc < 0) {
-                status = -1;
-                sendOk = false;
-            }
-            pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, metadata);
-            if (message.metadata.metadata) {
-                celix_properties_destroy(message.metadata.metadata);
-            }
-            if (serializedIoVecOutput) {
-                pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedIoVecOutput, serializedIoVecOutputLen);
-                serializedIoVecOutput = NULL;
-            }
+        if (serializedIoVecOutput) {
+            pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedIoVecOutput, serializedIoVecOutputLen);
+            serializedIoVecOutput = NULL;
         }
+    }
 
-        if (!sendOk) {
-            L_WARN("[PSA_TCP_V2_TS] Error sending msg. %s", strerror(errno));
-        }
-    } else {
-        L_WARN("[PSA_TCP_V2_TS] Error serialize message of type %s for scope/topic %s/%s", msgFqn,
-               sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
+    if (!sendOk) {
+        L_WARN("[PSA_TCP_V2_TS] Error sending msg. %s", strerror(errno));
     }
 
     return status;
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h
index 29c8f7a..57b13a6 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h
@@ -49,9 +49,4 @@ bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender);
 bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender);
 long pubsub_tcpTopicSender_protocolSvcId(pubsub_tcp_topic_sender_t *sender);
 
-/**
- * Returns a array of pubsub_admin_sender_msg_type_metrics_t entries for every msg_type/bundle send with the topic sender.
- */
-pubsub_admin_sender_metrics_t *pubsub_tcpTopicSender_metrics(pubsub_tcp_topic_sender_t *sender);
-
 #endif //CELIX_PUBSUB_TCP_TOPIC_SENDER_H
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
index 28a8af1..adc5ffe 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
@@ -257,14 +257,13 @@ static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgType
     int majorVersion;
     int minorVersion;
     celix_status_t status = pubsub_serializerHandler_getMsgInfo(sender->serializerHandler, msgTypeId, &msgFqn, &majorVersion, &minorVersion);
-
-
     if (status != CELIX_SUCCESS) {
         L_WARN("Cannot find serializer for msg id %u for serializer %s", msgTypeId,
                pubsub_serializerHandler_getSerializationType(sender->serializerHandler));
         return status;
     }
 
+
     if (sender->sockConnection != NULL) {
         delay_first_send_for_late_joiners(sender);
         size_t serializedOutputLen = 0;
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/psa_activator.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/psa_activator.c
index a6d1c2d..7aaee4d 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/psa_activator.c
@@ -41,9 +41,6 @@ typedef struct psa_zmq_activator {
     pubsub_admin_service_t adminService;
     long adminSvcId;
 
-    pubsub_admin_metrics_service_t adminMetricsService;
-    long adminMetricsSvcId;
-
     celix_shell_command_t cmdSvc;
     long cmdSvcId;
 } psa_zmq_activator_t;
@@ -90,16 +87,6 @@ int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
         act->adminSvcId = celix_bundleContext_registerService(ctx, psaSvc, PUBSUB_ADMIN_SERVICE_NAME, props);
     }
 
-    if (status == CELIX_SUCCESS) {
-        act->adminMetricsService.handle = act->admin;
-        act->adminMetricsService.metrics = pubsub_zmqAdmin_metrics;
-
-        celix_properties_t *props = celix_properties_create();
-        celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_ZMQ_ADMIN_TYPE);
-
-        act->adminMetricsSvcId = celix_bundleContext_registerService(ctx, &act->adminMetricsService, PUBSUB_ADMIN_METRICS_SERVICE_NAME, props);
-    }
-
     //register shell command service
     {
         act->cmdSvc.handle = act->admin;
@@ -117,7 +104,6 @@ int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
 int psa_zmq_stop(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
     celix_bundleContext_unregisterService(ctx, act->adminSvcId);
     celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
-    celix_bundleContext_unregisterService(ctx, act->adminMetricsSvcId);
     celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
     celix_bundleContext_stopTracker(ctx, act->protocolsTrackerId);
     pubsub_zmqAdmin_destroy(act->admin);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_psa_zmq_constants.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_psa_zmq_constants.h
index c50006a..7f1d891 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_psa_zmq_constants.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_psa_zmq_constants.h
@@ -35,10 +35,6 @@
 #define PSA_ZMQ_QOS_CONTROL_SCORE_KEY           "PSA_ZMQ_QOS_CONTROL_SCORE"
 #define PSA_ZMQ_DEFAULT_SCORE_KEY               "PSA_ZMQ_DEFAULT_SCORE"
 
-
-#define PSA_ZMQ_METRICS_ENABLED "PSA_ZMQ_METRICS_ENABLED"
-#define PSA_ZMQ_DEFAULT_METRICS_ENABLED true
-
 #define PSA_ZMQ_ZEROCOPY_ENABLED "PSA_ZMQ_ZEROCOPY_ENABLED"
 #define PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED false
 
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
index cc6de56..f842f01 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
@@ -751,34 +751,6 @@ bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine, FILE
     return status;
 }
 
-pubsub_admin_metrics_t* pubsub_zmqAdmin_metrics(void *handle) {
-    pubsub_zmq_admin_t *psa = handle;
-    pubsub_admin_metrics_t *result = calloc(1, sizeof(*result));
-    snprintf(result->psaType, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", PUBSUB_ZMQ_ADMIN_TYPE);
-    result->senders = celix_arrayList_create();
-    result->receivers = celix_arrayList_create();
-
-    celixThreadMutex_lock(&psa->topicSenders.mutex);
-    hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        pubsub_zmq_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
-        pubsub_admin_sender_metrics_t *metrics = pubsub_zmqTopicSender_metrics(sender);
-        celix_arrayList_add(result->senders, metrics);
-    }
-    celixThreadMutex_unlock(&psa->topicSenders.mutex);
-
-    celixThreadMutex_lock(&psa->topicReceivers.mutex);
-    iter = hashMapIterator_construct(psa->topicReceivers.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        pubsub_zmq_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
-        pubsub_admin_receiver_metrics_t *metrics = pubsub_zmqTopicReceiver_metrics(receiver);
-        celix_arrayList_add(result->receivers, metrics);
-    }
-    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
-
-    return result;
-}
-
 static pubsub_serializer_handler_t* pubsub_zmqAdmin_getSerializationHandler(pubsub_zmq_admin_t* psa, long msgSerializationMarkerSvcId) {
     pubsub_serializer_handler_t* handler = NULL;
     celixThreadMutex_lock(&psa->serializationHandlers.mutex);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h
index 475e464..6a8ba97 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h
@@ -48,7 +48,5 @@ void pubsub_zmqAdmin_removeProtocolSvc(void *handle, void *svc, const celix_prop
 
 bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine, FILE *outStream, FILE *errStream);
 
-pubsub_admin_metrics_t* pubsub_zmqAdmin_metrics(void *handle);
-
 #endif //CELIX_PUBSUB_ZMQ_ADMIN_H
 
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
index 9d2070f..22cbc7e 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
@@ -70,7 +70,6 @@ struct pubsub_zmq_topic_receiver {
     pubsub_protocol_service_t *protocol;
     char *scope;
     char *topic;
-    bool metricsEnabled;
 
     pubsub_interceptors_handler_t *interceptorsHandler;
 
@@ -138,7 +137,6 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
     receiver->protocol = protocol;
     receiver->scope = scope == NULL ? NULL : celix_utils_strdup(scope);
     receiver->topic = celix_utils_strdup(topic);
-    receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
 
     pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler);
 
@@ -442,15 +440,6 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
 }
 
 static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *receiver, psa_zmq_subscriber_entry_t* entry, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
-    //NOTE receiver->subscribers.mutex locked
-    bool monitor = receiver->metricsEnabled;
-
-    //monitoring
-    struct timespec beginSer;
-    struct timespec endSer;
-    int updateReceiveCount = 0;
-    int updateSerError = 0;
-
     const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
     if (msgFqn == NULL) {
         L_WARN("Cannot find msg fqn for msg id %u", message->header.msgId);
@@ -460,16 +449,10 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
     void *deserializedMsg = NULL;
     bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion);
     if (validVersion) {
-        if (monitor) {
-            clock_gettime(CLOCK_REALTIME, &beginSer);
-        }
         struct iovec deSerializeBuffer;
         deSerializeBuffer.iov_base = message->payload.payload;
         deSerializeBuffer.iov_len  = message->payload.length;
         celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 0, &deserializedMsg);
-        if (monitor) {
-            clock_gettime(CLOCK_REALTIME, &endSer);
-        }
         if (status == CELIX_SUCCESS) {
             uint32_t msgId = message->header.msgId;
             celix_properties_t *metadata = message->metadata.metadata;
@@ -495,10 +478,8 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
                 if (release) {
                     pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deserializedMsg);
                 }
-                updateReceiveCount += 1;
             }
         } else {
-            updateSerError += 1;
             L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
         }
     } else {
@@ -616,13 +597,6 @@ static void* psa_zmq_recvThread(void * data) {
     return NULL;
 }
 
-pubsub_admin_receiver_metrics_t* pubsub_zmqTopicReceiver_metrics(pubsub_zmq_topic_receiver_t *receiver) {
-    pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
-    snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope);
-    snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic);
-    return result;
-}
-
 
 static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t *receiver) {
     celixThreadMutex_lock(&receiver->requestedConnections.mutex);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
index 3900f55..f47ce37 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
@@ -49,7 +49,5 @@ void pubsub_zmqTopicReceiver_connectTo(pubsub_zmq_topic_receiver_t *receiver, co
 void pubsub_zmqTopicReceiver_disconnectFrom(pubsub_zmq_topic_receiver_t *receiver, const char *url);
 
 
-pubsub_admin_receiver_metrics_t* pubsub_zmqTopicReceiver_metrics(pubsub_zmq_topic_receiver_t *receiver);
-
 
 #endif //CELIX_PUBSUB_ZMQ_TOPIC_RECEIVER_H
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
index afc8e54..7d9e750 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
@@ -57,7 +57,6 @@ struct pubsub_zmq_topic_sender {
     long protocolSvcId;
     pubsub_protocol_service_t *protocol;
     uuid_t fwUUID;
-    bool metricsEnabled;
     bool zeroCopyEnabled;
 
     pubsub_interceptors_handler_t *interceptorsHandler;
@@ -67,6 +66,18 @@ struct pubsub_zmq_topic_sender {
     char *url;
     bool isStatic;
 
+    long seqNr; //atomic
+
+    struct {
+        bool dataLock; //atomic, protects below and protect zmq internal data
+        void *headerBuffer;
+        size_t headerBufferSize;
+        void *metadataBuffer;
+        size_t metadataBufferSize;
+        void *footerBuffer;
+        size_t footerBufferSize;
+    } zmqBuffers;
+
     struct {
         zsock_t *socket;
         zcert_t *cert;
@@ -83,37 +94,10 @@ struct pubsub_zmq_topic_sender {
     } boundedServices;
 };
 
-typedef struct psa_zmq_send_msg_entry {
-    uint32_t type; //msg type id (hash of fqn)
-    const char *fqn;
-    uint16_t msgMajorVersion;
-    uint16_t msgMinorVersion;
-    unsigned char originUUID[16];
-    pubsub_protocol_service_t *protSer;
-    unsigned int seqNr;
-    void *headerBuffer;
-    size_t headerBufferSize;
-    void *metadataBuffer;
-    size_t metadataBufferSize;
-    void *footerBuffer;
-    size_t footerBufferSize;
-    bool dataLocked; // protected ZMQ functions and seqNr
-    struct {
-        celix_thread_mutex_t mutex; //protects entries in struct
-        unsigned long nrOfMessagesSend;
-        unsigned long nrOfMessagesSendFailed;
-        unsigned long nrOfSerializationErrors;
-        struct timespec lastMessageSend;
-        double averageTimeBetweenMessagesInSeconds;
-        double averageSerializationTimeInSeconds;
-    } metrics;
-} psa_zmq_send_msg_entry_t;
-
 typedef struct psa_zmq_bounded_service_entry {
     pubsub_zmq_topic_sender_t *parent;
     pubsub_publisher_t service;
     long bndId;
-    hash_map_t *msgEntries; //key = msg type id, value = psa_zmq_send_msg_entry_t
     int getCount;
 } psa_zmq_bounded_service_entry_t;
 
@@ -156,7 +140,6 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
     if (uuid != NULL) {
         uuid_parse(uuid, sender->fwUUID);
     }
-    sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
     sender->zeroCopyEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_ZEROCOPY_ENABLED, PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED);
 
     pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
@@ -307,20 +290,6 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
     return sender;
 }
 
-static void pubsub_zmqTopicSender_destroyEntry(psa_zmq_send_msg_entry_t *msgEntry) {
-    celixThreadMutex_destroy(&msgEntry->metrics.mutex);
-    if(msgEntry->headerBuffer != NULL) {
-        free(msgEntry->headerBuffer);
-    }
-    if(msgEntry->metadataBuffer != NULL) {
-        free(msgEntry->metadataBuffer);
-    }
-    if(msgEntry->footerBuffer != NULL) {
-        free(msgEntry->footerBuffer);
-    }
-    free(msgEntry);
-}
-
 void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
     if (sender != NULL) {
         celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId);
@@ -328,21 +297,7 @@ void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
         zsock_destroy(&sender->zmq.socket);
 
         celixThreadMutex_lock(&sender->boundedServices.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            psa_zmq_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
-            if (entry != NULL) {
-                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
-                while (hashMapIterator_hasNext(&iter2)) {
-                    psa_zmq_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter2);
-                    pubsub_zmqTopicSender_destroyEntry(msgEntry);
-                }
-                hashMap_destroy(entry->msgEntries, false, false);
-
-                free(entry);
-            }
-        }
-        hashMap_destroy(sender->boundedServices.map, false, false);
+        hashMap_destroy(sender->boundedServices.map, false, true);
         celixThreadMutex_unlock(&sender->boundedServices.mutex);
 
         celixThreadMutex_destroy(&sender->boundedServices.mutex);
@@ -354,6 +309,9 @@ void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
         }
         free(sender->topic);
         free(sender->url);
+        free(sender->zmqBuffers.headerBuffer);
+        free(sender->zmqBuffers.metadataBuffer);
+        free(sender->zmqBuffers.footerBuffer);
         free(sender);
     }
 }
@@ -401,7 +359,6 @@ static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t *req
         entry->getCount = 1;
         entry->parent = sender;
         entry->bndId = bndId;
-        entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
         entry->service.handle = entry;
         entry->service.localMsgTypeIdForMsgType = psa_zmq_localMsgTypeIdForMsgType;
         entry->service.send = psa_zmq_topicPublicationSend;
@@ -424,286 +381,175 @@ static void psa_zmq_ungetPublisherService(void *handle, const celix_bundle_t *re
     if (entry != NULL && entry->getCount == 0) {
         //free entry
         hashMap_remove(sender->boundedServices.map, (void*)bndId);
-
-        hash_map_iterator_t iter = hashMapIterator_construct(entry->msgEntries);
-        while (hashMapIterator_hasNext(&iter)) {
-            psa_zmq_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter);
-            pubsub_zmqTopicSender_destroyEntry(msgEntry);
-        }
-        hashMap_destroy(entry->msgEntries, false, false);
         free(entry);
     }
     celixThreadMutex_unlock(&sender->boundedServices.mutex);
 }
 
-pubsub_admin_sender_metrics_t* pubsub_zmqTopicSender_metrics(pubsub_zmq_topic_sender_t *sender) {
-    pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result));
-    snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : sender->scope);
-    snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->topic);
-    celixThreadMutex_lock(&sender->boundedServices.mutex);
-    size_t count = 0;
-    hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        psa_zmq_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
-        hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
-        while (hashMapIterator_hasNext(&iter2)) {
-            hashMapIterator_nextValue(&iter2);
-            count += 1;
-        }
-    }
-
-    result->msgMetrics = calloc(count, sizeof(*result));
-
-    iter = hashMapIterator_construct(sender->boundedServices.map);
-    int i = 0;
-    while (hashMapIterator_hasNext(&iter)) {
-        psa_zmq_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
-        hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
-        while (hashMapIterator_hasNext(&iter2)) {
-            psa_zmq_send_msg_entry_t *mEntry = hashMapIterator_nextValue(&iter2);
-            celixThreadMutex_lock(&mEntry->metrics.mutex);
-            result->msgMetrics[i].nrOfMessagesSend = mEntry->metrics.nrOfMessagesSend;
-            result->msgMetrics[i].nrOfMessagesSendFailed = mEntry->metrics.nrOfMessagesSendFailed;
-            result->msgMetrics[i].nrOfSerializationErrors = mEntry->metrics.nrOfSerializationErrors;
-            result->msgMetrics[i].averageSerializationTimeInSeconds = mEntry->metrics.averageSerializationTimeInSeconds;
-            result->msgMetrics[i].averageTimeBetweenMessagesInSeconds = mEntry->metrics.averageTimeBetweenMessagesInSeconds;
-            result->msgMetrics[i].lastMessageSend = mEntry->metrics.lastMessageSend;
-            result->msgMetrics[i].bndId = entry->bndId;
-            result->msgMetrics[i].typeId = mEntry->type;
-            snprintf(result->msgMetrics[i].typeFqn, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", mEntry->fqn);
-            i += 1;
-            celixThreadMutex_unlock(&mEntry->metrics.mutex);
-        }
-    }
-
-    celixThreadMutex_unlock(&sender->boundedServices.mutex);
-    result->nrOfmsgMetrics = (int)count;
-    return result;
-}
-
 static void psa_zmq_freeMsg(void *msg, void *hint) {
     psa_zmq_zerocopy_free_entry *entry = hint;
     pubsub_serializerHandler_freeSerializedMsg(entry->serHandler, entry->msgId, entry->serializedOutput, entry->serializedOutputLen);
     free(entry);
 }
 
-static void psa_zmq_unlockData(void *unused __attribute__((unused)), void *hint) {
-    psa_zmq_send_msg_entry_t *entry = hint;
-    __atomic_store_n(&entry->dataLocked, false, __ATOMIC_RELEASE);
-}
-
 static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg, celix_properties_t *metadata) {
-    int status = CELIX_SUCCESS;
     psa_zmq_bounded_service_entry_t *bound = handle;
     pubsub_zmq_topic_sender_t *sender = bound->parent;
-    bool monitor = sender->metricsEnabled;
-
-    //TODO remove use of entry, so that one less lock is needed and drop metrics stuff
-    psa_zmq_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void*)(uintptr_t)(msgTypeId));
-
-    //metrics updates
-    struct timespec sendTime = { 0, 0 };
-    struct timespec serializationStart;
-    struct timespec serializationEnd;
-    //int unknownMessageCountUpdate = 0;
-    int sendErrorUpdate = 0;
-    int serializationErrorUpdate = 0;
-    int sendCountUpdate = 0;
-
-    if (entry == NULL) {
-        entry = calloc(1, sizeof(psa_zmq_send_msg_entry_t));
-        entry->protSer = sender->protocol;
-        entry->type = msgTypeId;
-        entry->fqn = pubsub_serializerHandler_getMsgFqn(sender->serializerHandler, msgTypeId);
-        entry->msgMajorVersion = pubsub_serializerHandler_getMsgMajorVersion(sender->serializerHandler, msgTypeId);
-        entry->msgMinorVersion = pubsub_serializerHandler_getMsgMinorVersion(sender->serializerHandler, msgTypeId);
-        uuid_copy(entry->originUUID, sender->fwUUID);
-        celixThreadMutex_create(&entry->metrics.mutex, NULL);
-        hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry);
-    }
-
-    delay_first_send_for_late_joiners(sender);
 
-    if (monitor) {
-        clock_gettime(CLOCK_REALTIME, &serializationStart);
-    }
-    size_t serializedOutputLen = 0;
-    struct iovec *serializedOutput = NULL;
-    status = pubsub_serializerHandler_serialize(sender->serializerHandler, msgTypeId, inMsg, &serializedOutput, &serializedOutputLen);
-    if (monitor) {
-        clock_gettime(CLOCK_REALTIME, &serializationEnd);
+    const char* msgFqn;
+    int majorVersion;
+    int minorversion;
+    celix_status_t status = pubsub_serializerHandler_getMsgInfo(sender->serializerHandler, msgTypeId, &msgFqn, &majorVersion, &minorversion);
+    if (status != CELIX_SUCCESS) {
+        L_WARN("Cannot find serializer for msg id %u for serializer %s", msgTypeId,
+               pubsub_serializerHandler_getSerializationType(sender->serializerHandler));
+        return status;
     }
 
-    if (status == CELIX_SUCCESS /*ser ok*/) {
-        // Some ZMQ functions are not thread-safe, but this atomic compare exchange ensures one access at a time.
-        bool expected = false;
-        while(!__atomic_compare_exchange_n(&entry->dataLocked, &expected, true, false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) {
-            expected = false;
-            usleep(500);
-        }
-
-        bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->fqn, msgTypeId, inMsg, &metadata);
-        if (cont) {
-
-            pubsub_protocol_message_t message;
-            message.payload.payload = serializedOutput->iov_base;
-            message.payload.length = serializedOutput->iov_len;
-
-            void *payloadData = NULL;
-            size_t payloadLength = 0;
-            entry->protSer->encodePayload(entry->protSer->handle, &message, &payloadData, &payloadLength);
+    bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, &metadata);
+    if (!cont) {
+        L_DEBUG("Cancel send based on pubsub interceptor cancel return");
+        return status;
+    }
 
-            if (metadata != NULL) {
-                message.metadata.metadata = metadata;
-                entry->protSer->encodeMetadata(entry->protSer->handle, &message, &entry->metadataBuffer, &entry->metadataBufferSize);
-            } else {
-                message.metadata.metadata = NULL;
-            }
+    size_t serializedIoVecOutputLen = 0; //entry->serializedIoVecOutputLen;
+    struct iovec *serializedIoVecOutput = NULL;
+    status = pubsub_serializerHandler_serialize(sender->serializerHandler, msgTypeId, inMsg, &serializedIoVecOutput, &serializedIoVecOutputLen);
 
-            entry->protSer->encodeFooter(entry->protSer->handle, &message, &entry->footerBuffer, &entry->footerBufferSize);
-
-            message.header.msgId = msgTypeId;
-            message.header.seqNr = entry->seqNr;
-            message.header.msgMajorVersion = entry->msgMajorVersion;
-            message.header.msgMinorVersion = entry->msgMinorVersion;
-            message.header.payloadSize = payloadLength;
-            message.header.metadataSize = entry->metadataBufferSize;
-            message.header.payloadPartSize = payloadLength;
-            message.header.payloadOffset = 0;
-            message.header.isLastSegment = 1;
-            message.header.convertEndianess = 0;
-
-            // increase seqNr
-            entry->seqNr++;
-
-            entry->protSer->encodeHeader(entry->protSer->handle, &message, &entry->headerBuffer, &entry->headerBufferSize);
-
-            errno = 0;
-            bool sendOk;
-
-            if (bound->parent->zeroCopyEnabled) {
-
-                zmq_msg_t msg1; // Header
-                zmq_msg_t msg2; // Payload
-                zmq_msg_t msg3; // Metadata
-                zmq_msg_t msg4; // Footer
-                void *socket = zsock_resolve(sender->zmq.socket);
-                psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry));
-                freeMsgEntry->serHandler = sender->serializerHandler;
-                freeMsgEntry->msgId = msgTypeId;
-                freeMsgEntry->serializedOutput = serializedOutput;
-                freeMsgEntry->serializedOutputLen = serializedOutputLen;
-
-                zmq_msg_init_data(&msg1, entry->headerBuffer, entry->headerBufferSize, psa_zmq_unlockData, entry);
-                //send header
-                int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
-                if (rc == -1) {
-                    L_WARN("Error sending header msg. %s", strerror(errno));
-                    zmq_msg_close(&msg1);
-                }
+    if (status != CELIX_SUCCESS /*serialization not ok*/) {
+        L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", msgFqn, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
+        return status;
+    }
 
-                //send Payload
-                if (rc > 0) {
-                    int flag = ((entry->metadataBufferSize > 0)  || (entry->footerBufferSize > 0)) ? ZMQ_SNDMORE : 0;
-                    zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, freeMsgEntry);
-                    rc = zmq_msg_send(&msg2, socket, flag);
-                    if (rc == -1) {
-                        L_WARN("Error sending payload msg. %s", strerror(errno));
-                        zmq_msg_close(&msg2);
-                    }
-                }
+    delay_first_send_for_late_joiners(sender);
 
-                //send MetaData
-                if (rc > 0 && entry->metadataBufferSize > 0) {
-                    int flag = (entry->footerBufferSize > 0 ) ? ZMQ_SNDMORE : 0;
-                    zmq_msg_init_data(&msg3, entry->metadataBuffer, entry->metadataBufferSize, NULL, NULL);
-                    rc = zmq_msg_send(&msg3, socket, flag);
-                    if (rc == -1) {
-                        L_WARN("Error sending metadata msg. %s", strerror(errno));
-                        zmq_msg_close(&msg3);
-                    }
-                }
+    // Some ZMQ functions are not thread-safe, but this atomic compare exchange ensures one access at a time.
+    // Also protect sender->zmqBuffers (header, meta and footer)
+    bool expected = false;
+    while(!__atomic_compare_exchange_n(&sender->zmqBuffers.dataLock, &expected, true, false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) {
+        expected = false;
+        usleep(5);
+    }
 
-                //send Footer
-                if (rc > 0 && entry->footerBufferSize > 0) {
-                    zmq_msg_init_data(&msg4, entry->footerBuffer, entry->footerBufferSize, NULL, NULL);
-                    rc = zmq_msg_send(&msg4, socket, 0);
-                    if (rc == -1) {
-                        L_WARN("Error sending footer msg. %s", strerror(errno));
-                        zmq_msg_close(&msg4);
-                    }
-                }
+    pubsub_protocol_message_t message;
+    message.payload.payload = serializedIoVecOutput->iov_base;
+    message.payload.length = serializedIoVecOutput->iov_len;
 
-                sendOk = rc > 0;
-            } else {
-                //no zero copy
-                zmsg_t *msg = zmsg_new();
-                zmsg_addmem(msg, entry->headerBuffer, entry->headerBufferSize);
-                zmsg_addmem(msg, payloadData, payloadLength);
-                if (entry->metadataBufferSize > 0) {
-                    zmsg_addmem(msg, entry->metadataBuffer, entry->metadataBufferSize);
-                }
-                if (entry->footerBufferSize > 0) {
-                    zmsg_addmem(msg, entry->footerBuffer, entry->footerBufferSize);
-                }
-                int rc = zmsg_send(&msg, sender->zmq.socket);
-                sendOk = rc == 0;
+    void *payloadData = NULL;
+    size_t payloadLength = 0;
+    sender->protocol->encodePayload(sender->protocol->handle, &message, &payloadData, &payloadLength);
 
-                if (!sendOk) {
-                    zmsg_destroy(&msg); //if send was not ok, no owner change -> destroy msg
-                }
+    if (metadata != NULL) {
+        message.metadata.metadata = metadata;
+        sender->protocol->encodeMetadata(sender->protocol->handle, &message, &sender->zmqBuffers.metadataBuffer, &sender->zmqBuffers.metadataBufferSize);
+    } else {
+        message.metadata.metadata = NULL;
+    }
 
-                // Note: serialized Payload is deleted by serializer
-                if (payloadData && (payloadData != message.payload.payload)) {
-                    free(payloadData);
-                }
+    sender->protocol->encodeFooter(sender->protocol->handle, &message, &sender->zmqBuffers.footerBuffer, &sender->zmqBuffers.footerBufferSize);
+
+    message.header.msgId = msgTypeId;
+    message.header.seqNr = __atomic_fetch_add(&sender->seqNr, 1, __ATOMIC_RELAXED);
+    message.header.msgMajorVersion = majorVersion;
+    message.header.msgMinorVersion = minorversion;
+    message.header.payloadSize = payloadLength;
+    message.header.metadataSize = sender->zmqBuffers.metadataBufferSize;
+    message.header.payloadPartSize = payloadLength;
+    message.header.payloadOffset = 0;
+    message.header.isLastSegment = 1;
+    message.header.convertEndianess = 0;
+
+    sender->protocol->encodeHeader(sender->protocol->handle, &message, &sender->zmqBuffers.headerBuffer, &sender->zmqBuffers.headerBufferSize);
+
+    errno = 0;
+    bool sendOk;
+    if (bound->parent->zeroCopyEnabled) {
+        zmq_msg_t msg1; // Header
+        zmq_msg_t msg2; // Payload
+        zmq_msg_t msg3; // Metadata
+        zmq_msg_t msg4; // Footer
+        void *socket = zsock_resolve(sender->zmq.socket);
+        psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry)); //NOTE should be improved. Not really zero copy
+        freeMsgEntry->serHandler = sender->serializerHandler;
+        freeMsgEntry->msgId = msgTypeId;
+        freeMsgEntry->serializedOutput = serializedIoVecOutput;
+        freeMsgEntry->serializedOutputLen = serializedIoVecOutputLen;
+
+        zmq_msg_init_data(&msg1, sender->zmqBuffers.headerBuffer, sender->zmqBuffers.headerBufferSize, NULL, NULL);
+        //send header
+        int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
+        if (rc == -1) {
+            L_WARN("Error sending header msg. %s", strerror(errno));
+            zmq_msg_close(&msg1);
+        }
 
-                __atomic_store_n(&entry->dataLocked, false, __ATOMIC_RELEASE);
+        //send Payload
+        if (rc > 0) {
+            int flag = ((sender->zmqBuffers.metadataBufferSize > 0)  || (sender->zmqBuffers.footerBufferSize > 0)) ? ZMQ_SNDMORE : 0;
+            zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, freeMsgEntry);
+            rc = zmq_msg_send(&msg2, socket, flag);
+            if (rc == -1) {
+                L_WARN("Error sending payload msg. %s", strerror(errno));
+                zmq_msg_close(&msg2);
             }
-            pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->fqn, msgTypeId, inMsg, metadata);
+        }
 
-            if (message.metadata.metadata) {
-                celix_properties_destroy(message.metadata.metadata);
-            }
-            if (!bound->parent->zeroCopyEnabled && serializedOutput) {
-                pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedOutput, serializedOutputLen);
+        //send MetaData
+        if (rc > 0 && sender->zmqBuffers.metadataBufferSize > 0) {
+            int flag = (sender->zmqBuffers.footerBufferSize > 0 ) ? ZMQ_SNDMORE : 0;
+            zmq_msg_init_data(&msg3, sender->zmqBuffers.metadataBuffer, sender->zmqBuffers.metadataBufferSize, NULL, NULL);
+            rc = zmq_msg_send(&msg3, socket, flag);
+            if (rc == -1) {
+                L_WARN("Error sending metadata msg. %s", strerror(errno));
+                zmq_msg_close(&msg3);
             }
+        }
 
-            if (sendOk) {
-                sendCountUpdate = 1;
-            } else {
-                sendErrorUpdate = 1;
-                L_WARN("[PSA_ZMQ_TS] Error sending zmg. %s", strerror(errno));
+        //send Footer
+        if (rc > 0 && sender->zmqBuffers.footerBufferSize > 0) {
+            zmq_msg_init_data(&msg4, sender->zmqBuffers.footerBuffer, sender->zmqBuffers.footerBufferSize, NULL, NULL);
+            rc = zmq_msg_send(&msg4, socket, 0);
+            if (rc == -1) {
+                L_WARN("Error sending footer msg. %s", strerror(errno));
+                zmq_msg_close(&msg4);
             }
-        } else {
-            L_WARN("no continue");
         }
+        sendOk = rc > 0;
     } else {
-        serializationErrorUpdate = 1;
-        L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", entry->fqn, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
-    }
-
-    if (monitor) {
-        celixThreadMutex_lock(&entry->metrics.mutex);
+        //no zero copy
+        zmsg_t *msg = zmsg_new();
+        zmsg_addmem(msg, sender->zmqBuffers.headerBuffer, sender->zmqBuffers.headerBufferSize);
+        zmsg_addmem(msg, payloadData, payloadLength);
+        if (sender->zmqBuffers.metadataBufferSize > 0) {
+            zmsg_addmem(msg, sender->zmqBuffers.metadataBuffer, sender->zmqBuffers.metadataBufferSize);
+        }
+        if (sender->zmqBuffers.footerBufferSize > 0) {
+            zmsg_addmem(msg, sender->zmqBuffers.footerBuffer, sender->zmqBuffers.footerBufferSize);
+        }
+        int rc = zmsg_send(&msg, sender->zmq.socket);
+        sendOk = rc == 0;
 
-        double n = (double)(entry->metrics.nrOfMessagesSend + entry->metrics.nrOfMessagesSendFailed);
-        double diff = celix_difftime(&serializationStart, &serializationEnd);
-        double average = (entry->metrics.averageSerializationTimeInSeconds * n + diff) / (n+1);
-        entry->metrics.averageSerializationTimeInSeconds = average;
+        if (!sendOk) {
+            zmsg_destroy(&msg); //if send was not ok, no owner change -> destroy msg
+        }
 
-        if (entry->metrics.nrOfMessagesSend > 2) {
-            diff = celix_difftime(&entry->metrics.lastMessageSend, &sendTime);
-            n = entry->metrics.nrOfMessagesSend;
-            average = (entry->metrics.averageTimeBetweenMessagesInSeconds * n + diff) / (n+1);
-            entry->metrics.averageTimeBetweenMessagesInSeconds = average;
+        // Note: serialized Payload is deleted by serializer
+        if (payloadData && (payloadData != message.payload.payload)) {
+            free(payloadData);
         }
+    }
+    __atomic_store_n(&sender->zmqBuffers.dataLock, false, __ATOMIC_RELEASE);
+    pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, metadata);
 
-        entry->metrics.lastMessageSend = sendTime;
-        entry->metrics.nrOfMessagesSend += sendCountUpdate;
-        entry->metrics.nrOfMessagesSendFailed += sendErrorUpdate;
-        entry->metrics.nrOfSerializationErrors += serializationErrorUpdate;
+    if (message.metadata.metadata) {
+        celix_properties_destroy(message.metadata.metadata);
+    }
+    if (!bound->parent->zeroCopyEnabled && serializedIoVecOutput) {
+        pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedIoVecOutput, serializedIoVecOutputLen);
+    }
 
-        celixThreadMutex_unlock(&entry->metrics.mutex);
+    if (!sendOk) {
+        L_WARN("[PSA_ZMQ_TS] Error sending zmg. %s", strerror(errno));
     }
 
     return status;
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
index 584b88d..bb49a2a 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
@@ -51,9 +51,5 @@ bool pubsub_zmqTopicSender_isStatic(pubsub_zmq_topic_sender_t *sender);
 const char* pubsub_zmqTopicSender_serializerType(pubsub_zmq_topic_sender_t *sender);
 long pubsub_zmqTopicSender_protocolSvcId(pubsub_zmq_topic_sender_t *sender);
 
-/**
- * Returns a array of pubsub_admin_sender_msg_type_metrics_t entries for every msg_type/bundle send with the topic sender.
- */
-pubsub_admin_sender_metrics_t* pubsub_zmqTopicSender_metrics(pubsub_zmq_topic_sender_t *sender);
 
 #endif //CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H