You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rb...@apache.org on 2020/10/06 20:02:42 UTC

[celix] branch feature/tcp_admin_segm updated: Add message segmentation to tcp admin

This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a commit to branch feature/tcp_admin_segm
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/feature/tcp_admin_segm by this push:
     new 5ad77fe  Add message segmentation to tcp admin
5ad77fe is described below

commit 5ad77fe50dfaea2f79e17b9e8d5e55ffba89da11
Author: Roy Bulter <ro...@gmail.com>
AuthorDate: Tue Oct 6 22:02:27 2020 +0200

    Add message segmentation to tcp admin
---
 bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt     |   1 +
 .../pubsub/pubsub_admin_tcp/src/psa_activator.c    |   4 +-
 .../src/pubsub_psa_tcp_constants.h                 |  34 +-
 .../pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c |  45 +-
 .../pubsub_admin_tcp/src/pubsub_tcp_common.h       |   2 +
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.c      | 719 +++++++++++----------
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.h      |   8 +-
 .../src/pubsub_tcp_topic_receiver.c                | 176 +++--
 .../src/pubsub_tcp_topic_receiver.h                |   3 +-
 .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c | 169 +++--
 .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h |  11 +-
 .../src/pubsub_topology_manager.c                  |   6 +-
 .../src/pubsub_topology_manager.h                  |   2 +-
 .../pubsub/pubsub_utils/include/pubsub_utils_url.h |   8 +-
 bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c |  22 +-
 bundles/pubsub/test/CMakeLists.txt                 |  28 +-
 bundles/pubsub/test/meta_data/ping2.properties     |   2 +-
 bundles/pubsub/test/meta_data/pong2.properties     |   4 +-
 bundles/pubsub/test/test/loopback_activator.c      |   3 +-
 bundles/pubsub/test/test/tst_endpoint_activator.c  |   2 +-
 20 files changed, 651 insertions(+), 598 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt b/bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt
index a3bedf4..a19b425 100644
--- a/bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt
@@ -28,6 +28,7 @@ add_celix_bundle(celix_pubsub_admin_tcp
         src/pubsub_tcp_topic_sender.c
         src/pubsub_tcp_topic_receiver.c
         src/pubsub_tcp_handler.c
+        src/pubsub_tcp_common.c
 )
 
 set_target_properties(celix_pubsub_admin_tcp PROPERTIES INSTALL_RPATH "$ORIGIN")
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c b/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c
index a5ae576..d93c478 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c
@@ -121,9 +121,7 @@ int psa_tcp_start(psa_tcp_activator_t *act, celix_bundle_context_t *ctx) {
         celix_properties_t *props = celix_properties_create();
         celix_properties_set(props, CELIX_SHELL_COMMAND_NAME, "celix::psa_tcp");
         celix_properties_set(props, CELIX_SHELL_COMMAND_USAGE, "psa_tcp");
-        celix_properties_set(props,
-                             CELIX_SHELL_COMMAND_DESCRIPTION,
-                             "Print the information about the TopicSender and TopicReceivers for the TCP PSA");
+        celix_properties_set(props, CELIX_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the TCP PSA");
         act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, CELIX_SHELL_COMMAND_SERVICE_NAME, props);
     }
 
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
index 4284042..1c28086 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
@@ -23,19 +23,20 @@
 #define PSA_TCP_BASE_PORT                       "PSA_TCP_BASE_PORT"
 #define PSA_TCP_MAX_PORT                        "PSA_TCP_MAX_PORT"
 
-#define PSA_TCP_MAX_RECV_SESSIONS               "PSA_TCP_MAX_RECV_SESSIONS"
+#define PSA_TCP_MAX_MESSAGE_SIZE                "PSA_TCP_MAX_MESSAGE_SIZE"
 #define PSA_TCP_RECV_BUFFER_SIZE                "PSA_TCP_RECV_BUFFER_SIZE"
 #define PSA_TCP_TIMEOUT                         "PSA_TCP_TIMEOUT"
 #define PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT   "PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT"
+#define PSA_TCP_SEND_DELAY                      "PSA_TCP_SEND_DELAY"
 
 #define PSA_TCP_DEFAULT_BASE_PORT               5501
 #define PSA_TCP_DEFAULT_MAX_PORT                6000
 
-#define PSA_TCP_DEFAULT_MAX_RECV_SESSIONS       1
-
+#define PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE        UINT32_MAX
 #define PSA_TCP_DEFAULT_RECV_BUFFER_SIZE        65 * 1024
 #define PSA_TCP_DEFAULT_TIMEOUT                 2000 // 2 seconds
 #define PSA_TCP_SUBSCRIBER_CONNECTION_DEFAULT_TIMEOUT 250 // 250 ms
+#define PSA_TCP_DEFAULT_SEND_DELAY              250 //  250 ms
 
 #define PSA_TCP_DEFAULT_QOS_SAMPLE_SCORE        30
 #define PSA_TCP_DEFAULT_QOS_CONTROL_SCORE       70
@@ -102,21 +103,30 @@
  */
 #define PUBSUB_TCP_STATIC_CONNECT_URLS          "tcp.static.connect.urls"
 
+
 /**
- * Name of environment variable with space-separated list of ips/urls to connect to
- * e.g. PSA_TCP_STATIC_CONNECT_FOR_topic_scope="tcp://127.0.0.1:4444 tcp://127.0.0.2:4444"
+ * Defines if the publisher / subscriber is a passive endpoint and shares
+ * the connection with publisher / subscriber endpoint with the matching (passive) key
  */
-#define PUBSUB_TCP_STATIC_CONNECT_URLS_FOR "PSA_TCP_STATIC_CONNECT_URL_FOR_"
+#define PUBSUB_TCP_PASSIVE_CONFIGURED            "tcp.passive.configured"
+#define PUBSUB_TCP_PASSIVE_KEY                   "tcp.passive.key"
 
 /**
- * The static endpoint type which a static endpoint should be configured.
- * Can be set in the topic properties.
+ * Name of environment variable to indicate that passive endpoint is configured
+ * e.g. PSA_TCP_PASSIVE_CONFIGURED_topic_scope="true"
  */
-#define PUBSUB_TCP_STATIC_ENDPOINT_TYPE         "tcp.static.endpoint.type"
-
-#define PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER  "server"
-#define PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT  "client"
+#define PUBSUB_TCP_PASSIVE_ENABLED               "PSA_TCP_PASSIVE_CONFIGURED_"
+/**
+ * Name of environment variable to configure the passive key (see PUBSUB_TCP_PASSIVE_KEY )
+ * e.g. PSA_TCP_PASSIVE_KEY__topic_scope="tcp://localhost:4444"
+ */
+#define PUBSUB_TCP_PASSIVE_SELECTION_KEY         "PSA_TCP_PASSIVE_KEY_"
 
+/**
+ * Name of environment variable with space-separated list of ips/urls to connect to
+ * e.g. PSA_TCP_STATIC_CONNECT_FOR_topic_scope="tcp://127.0.0.1:4444 tcp://127.0.0.2:4444"
+ */
+#define PUBSUB_TCP_STATIC_CONNECT_URLS_FOR "PSA_TCP_STATIC_CONNECT_URL_FOR_"
 
 /**
  * Realtime thread prio and scheduling information. This is used to setup the thread prio/sched of the
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
index e7ad284..086837f 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
@@ -434,17 +434,19 @@ celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope,
             const char *psaType = PUBSUB_TCP_ADMIN_TYPE;
             const char *serType = serEntry->serType;
             const char *protType = protEntry->protType;
-            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
-                                                serType, protType, NULL);
+            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, protType, NULL);
             celix_properties_set(newEndpoint, PUBSUB_TCP_URL_KEY, pubsub_tcpTopicSender_url(sender));
 
             celix_properties_setBool(newEndpoint, PUBSUB_TCP_STATIC_CONFIGURED, pubsub_tcpTopicSender_isStatic(sender));
-            celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_SYSTEM_VISIBILITY);
-
+            if (pubsub_tcpTopicSender_isPassive(sender)) {
+                celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_LOCAL_VISIBILITY);
+            } else {
+                celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_SYSTEM_VISIBILITY);
+            }
             //if available also set container name
             const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
             if (cn != NULL)
-                celix_properties_set(newEndpoint, "container_name", cn);
+              celix_properties_set(newEndpoint, "container_name", cn);
             hashMap_put(psa->topicSenders.map, key, sender);
         } else {
             L_ERROR("[PSA TCP] Error creating a TopicSender");
@@ -636,24 +638,13 @@ pubsub_tcpAdmin_disconnectEndpointFromReceiver(pubsub_tcp_admin_t *psa, pubsub_t
     //note can be called with discoveredEndpoint.mutex lock
     celix_status_t status = CELIX_SUCCESS;
 
-    const char *scope = pubsub_tcpTopicReceiver_scope(receiver);
-    const char *topic = pubsub_tcpTopicReceiver_topic(receiver);
-
-    const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
-    const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
     const char *url = celix_properties_get(endpoint, PUBSUB_TCP_URL_KEY, NULL);
 
     if (url == NULL) {
         L_WARN("[PSA TCP] Error got endpoint without tcp url");
         status = CELIX_BUNDLE_EXCEPTION;
     } else {
-        if (eTopic != NULL && topic != NULL && strncmp(eTopic, topic, 1024 * 1024) == 0) {
-            if (scope == NULL && eScope == NULL) {
-                pubsub_tcpTopicReceiver_disconnectFrom(receiver, url);
-            } else if (scope != NULL && eScope != NULL && strncmp(eScope, scope, 1024 * 1024) == 0) {
-                pubsub_tcpTopicReceiver_disconnectFrom(receiver, url);
-            }
-        }
+        pubsub_tcpTopicReceiver_disconnectFrom(receiver, url);
     }
 
     return status;
@@ -689,6 +680,21 @@ bool pubsub_tcpAdmin_executeCommand(void *handle, const char *commandLine __attr
                                     FILE *errStream __attribute__((unused))) {
     pubsub_tcp_admin_t *psa = handle;
     celix_status_t status = CELIX_SUCCESS;
+    char *line = celix_utils_strdup(commandLine);
+    char *token = line;
+    strtok_r(line, " ", &token); //first token is command name
+    strtok_r(NULL, " ", &token); //second token is sub command
+
+    if (celix_utils_stringEquals(token, "nr_of_receivers")) {
+        celixThreadMutex_lock(&psa->topicReceivers.mutex);
+        fprintf(out,"%i\n", hashMap_size(psa->topicReceivers.map));
+        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    }
+    if (celix_utils_stringEquals(token, "nr_of_senders")) {
+        celixThreadMutex_lock(&psa->topicSenders.mutex);
+        fprintf(out, "%i\n", hashMap_size(psa->topicSenders.map));
+        celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    }
 
     fprintf(out, "\n");
     fprintf(out, "Topic Senders:\n");
@@ -707,11 +713,12 @@ bool pubsub_tcpAdmin_executeCommand(void *handle, const char *commandLine __attr
         const char *scope = pubsub_tcpTopicSender_scope(sender);
         const char *topic = pubsub_tcpTopicSender_topic(sender);
         const char *url = pubsub_tcpTopicSender_url(sender);
+        const char *isPassive = pubsub_tcpTopicSender_isPassive(sender) ? " (passive)" : "";
         const char *postUrl = pubsub_tcpTopicSender_isStatic(sender) ? " (static)" : "";
         fprintf(out, "|- Topic Sender %s/%s\n", scope == NULL ? "(null)" : scope, topic);
         fprintf(out, "   |- serializer type = %s\n", serType);
         fprintf(out, "   |- protocol type = %s\n", protType);
-        fprintf(out, "   |- url            = %s%s\n", url, postUrl);
+        fprintf(out, "   |- url            = %s%s%s\n", url, postUrl, isPassive);
     }
     celixThreadMutex_unlock(&psa->topicSenders.mutex);
     celixThreadMutex_unlock(&psa->protocols.mutex);
@@ -788,4 +795,4 @@ pubsub_admin_metrics_t *pubsub_tcpAdmin_metrics(void *handle) {
     celixThreadMutex_unlock(&psa->topicReceivers.mutex);
 
     return result;
-}
\ No newline at end of file
+}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
index 1fe1d00..9ea31db 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
@@ -28,4 +28,6 @@ typedef struct pubsub_tcp_endPointStore {
     hash_map_t *map;
 } pubsub_tcp_endPointStore_t;
 
+bool psa_tcp_isPassive(const char* buffer);
+
 #endif //CELIX_PUBSUB_TCP_COMMON_H
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 0cbd28f..51103f0 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -31,6 +31,7 @@
 #include <errno.h>
 #include <array_list.h>
 #include <pthread.h>
+#include <sys/ioctl.h>
 #if defined(__APPLE__)
 #include <sys/types.h>
 #include <sys/event.h>
@@ -78,18 +79,26 @@ typedef struct psa_tcp_connection_entry {
     bool connected;
     bool headerError;
     pubsub_protocol_message_t header;
+    unsigned int maxMsgSize;
     unsigned int syncSize;
     unsigned int headerSize;
-    unsigned int headerBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload
-    void *headerBuffer;
-    unsigned int footerSize;
-    void *footerBuffer;
+    unsigned int readHeaderBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload
+    void *readHeaderBuffer;
+    unsigned int writeHeaderBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload
+    void *writeHeaderBuffer;
+    unsigned int readFooterSize;
+    void *readFooterBuffer;
+    unsigned int writeFooterSize;
+    void *writeFooterBuffer;
     unsigned int bufferSize;
     void *buffer;
-    unsigned int bufferReadSize;
-    unsigned int metaBufferSize;
-    void *metaBuffer;
+    size_t readMetaBufferSize;
+    void *readMetaBuffer;
+    size_t writeMetaBufferSize;
+    void *writeMetaBuffer;
     unsigned int retryCount;
+    celix_thread_mutex_t writeMutex;
+    celix_thread_mutex_t readMutex;
 } psa_tcp_connection_entry_t;
 
 //
@@ -114,38 +123,25 @@ struct pubsub_tcpHandler {
     celix_log_helper_t *logHelper;
     pubsub_protocol_service_t *protocol;
     unsigned int bufferSize;
-    unsigned int maxNofBuffer;
+    unsigned int maxMsgSize;
     unsigned int maxSendRetryCount;
     unsigned int maxRcvRetryCount;
     double sendTimeout;
     double rcvTimeout;
     celix_thread_t thread;
     bool running;
+    bool enableReceiveEvent;
 };
 
-static inline int
-pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock);
-
+static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock);
 static inline int pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry);
-
 static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int fd);
-
-static inline psa_tcp_connection_entry_t *
-pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, char *external_url,
-                              struct sockaddr_in *addr);
-
+static inline psa_tcp_connection_entry_t* pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, char *external_url, struct sockaddr_in *addr);
 static inline void pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry);
-
 static inline void pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *handle, int fd, unsigned int index);
-
-static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* buffer, unsigned int offset, unsigned int size, int flag );
-
 static inline void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry);
-
 static inline void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd);
-
 static inline void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle);
-
 static void *pubsub_tcpHandler_thread(void *data);
 
 //
@@ -167,7 +163,6 @@ pubsub_tcpHandler_t *pubsub_tcpHandler_create(pubsub_protocol_service_t *protoco
         handle->logHelper = logHelper;
         handle->protocol = protocol;
         handle->bufferSize = MAX_DEFAULT_BUFFER_SIZE;
-        handle->maxNofBuffer = 1; // Reserved for future Use;
         celixThreadRwlock_create(&handle->dbLock, 0);
         handle->running = true;
         celixThread_create(&handle->thread, NULL, pubsub_tcpHandler_thread, handle);
@@ -261,7 +256,7 @@ int pubsub_tcpHandler_open(pubsub_tcpHandler_t *handle, char *url) {
                 L_ERROR("[TCP Socket] Error setsockopt (SO_RCVTIMEO) to set send timeout: %s", strerror(errno));
             }
         }
-        struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr);
+        struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr);
         if (addr) {
             rc = bind(fd, (struct sockaddr *) addr, sizeof(struct sockaddr));
             if (rc != 0) {
@@ -311,6 +306,8 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
     if (fd >= 0) {
         entry = calloc(sizeof(psa_tcp_connection_entry_t), 1);
         entry->fd = fd;
+        celixThreadMutex_create(&entry->writeMutex, NULL);
+        celixThreadMutex_create(&entry->readMutex, NULL);
         if (url)
             entry->url = strndup(url, 1024 * 1024);
         if (interface_url) {
@@ -326,17 +323,29 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
         handle->protocol->getHeaderSize(handle->protocol->handle, &size);
         entry->headerSize = size;
         handle->protocol->getHeaderBufferSize(handle->protocol->handle, &size);
-        entry->headerBufferSize = size;
+        entry->readHeaderBufferSize = size;
+        entry->writeHeaderBufferSize = size;
         handle->protocol->getSyncHeaderSize(handle->protocol->handle, &size);
         entry->syncSize = size;
         handle->protocol->getFooterSize(handle->protocol->handle, &size);
-        entry->footerSize = size;
+        entry->readFooterSize = size;
+        entry->writeFooterSize = size;
         entry->bufferSize = handle->bufferSize;
+        // incase header is included in the payload buffer, make at least the payload buffer the size of the header
+        if (!entry->readHeaderBufferSize) {
+            entry->bufferSize = MAX(handle->bufferSize, entry->headerSize);
+        }
         entry->connected = false;
-        if (entry->headerBufferSize) {
-            entry->headerBuffer = calloc(sizeof(char), entry->headerSize);
+        unsigned minimalMsgSize = entry->writeHeaderBufferSize + entry->writeFooterSize;
+        if ((minimalMsgSize > handle->maxMsgSize) && (handle->maxMsgSize)) {
+            L_ERROR("[TCP Socket] maxMsgSize (%d) < headerSize + FooterSize (%d): %s\n", handle->maxMsgSize, minimalMsgSize);
+        } else {
+            entry->maxMsgSize = (handle->maxMsgSize) ? handle->maxMsgSize : UINT32_MAX;
         }
-        if (entry->footerSize) entry->footerBuffer = calloc(sizeof(char), entry->footerSize);
+        if (entry->readHeaderBufferSize) entry->readHeaderBuffer = calloc(sizeof(char), entry->readHeaderBufferSize);
+        if (entry->writeHeaderBufferSize) entry->writeHeaderBuffer = calloc(sizeof(char), entry->writeHeaderBufferSize);
+        if (entry->readFooterSize) entry->readFooterBuffer = calloc(sizeof(char), entry->readFooterSize);
+        if (entry->writeFooterSize) entry->writeFooterBuffer = calloc(sizeof(char), entry->writeFooterSize);
         if (entry->bufferSize) entry->buffer = calloc(sizeof(char), entry->bufferSize);
     }
     return entry;
@@ -348,40 +357,18 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
 static inline void
 pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry) {
     if (entry) {
-        if (entry->url) {
-            free(entry->url);
-            entry->url = NULL;
-        }
-        if (entry->interface_url) {
-            free(entry->interface_url);
-            entry->interface_url = NULL;
-        }
-        if (entry->fd >= 0) {
-            close(entry->fd);
-            entry->fd = -1;
-        }
-        if (entry->buffer) {
-            free(entry->buffer);
-            entry->buffer = NULL;
-            entry->bufferSize = 0;
-        }
-        if (entry->headerBuffer) {
-            free(entry->headerBuffer);
-            entry->headerBuffer = NULL;
-            entry->headerBufferSize = 0;
-        }
-
-        if (entry->footerBuffer) {
-            free(entry->footerBuffer);
-            entry->footerBuffer = NULL;
-        }
-
-        if (entry->metaBuffer) {
-            free(entry->metaBuffer);
-            entry->metaBuffer = NULL;
-            entry->metaBufferSize = 0;
-        }
-        entry->connected = false;
+        free(entry->url);
+        free(entry->interface_url);
+        if (entry->fd >= 0) close(entry->fd);
+        free(entry->buffer);
+        free(entry->readHeaderBuffer);
+        free(entry->writeHeaderBuffer);
+        free(entry->readFooterBuffer);
+        free(entry->writeFooterBuffer);
+        free(entry->readMetaBuffer);
+        free(entry->writeMetaBuffer);
+        celixThreadMutex_destroy(&entry->writeMutex);
+        celixThreadMutex_destroy(&entry->readMutex);
         free(entry);
     }
 }
@@ -403,8 +390,7 @@ pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *handle, int fd, unsign
 //
 int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
     int rc = 0;
-    psa_tcp_connection_entry_t *entry =
-        hashMap_get(handle->connection_url_map, (void *) (intptr_t) url);
+    psa_tcp_connection_entry_t *entry = hashMap_get(handle->connection_url_map, (void *) (intptr_t) url);
     if (entry == NULL) {
         pubsub_utils_url_t *url_info = pubsub_utils_url_parse(url);
         int fd = pubsub_tcpHandler_open(handle, url_info->interface_url);
@@ -414,12 +400,11 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
         socklen_t len = sizeof(sin);
         getsockname(fd, (struct sockaddr *) &sin, &len);
         char *interface_url = pubsub_utils_url_get_url(&sin, NULL);
-        struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr);
+        struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr);
         if ((rc >= 0) && addr) {
             rc = connect(fd, (struct sockaddr *) addr, sizeof(struct sockaddr));
             if (rc < 0 && errno != EINPROGRESS) {
-                L_ERROR("[TCP Socket] Cannot connect to %s:%d: using; %s err: %s\n", url_info->hostname, url_info->portnr, interface_url,
-                        strerror(errno));
+                L_ERROR("[TCP Socket] Cannot connect to %s:%d: using %s err(%d): %s\n", url_info->hostname, url_info->port_nr, interface_url, errno, strerror(errno));
                 close(fd);
             } else {
                 entry = pubsub_tcpHandler_createEntry(handle, fd, url, interface_url, &sin);
@@ -552,7 +537,7 @@ static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle,
     else {
         rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
         if (rc < 0) {
-            L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING epoll: %s\n", strerror(errno));
+            L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING: %s\n", strerror(errno));
         }
     }
     return rc;
@@ -570,6 +555,7 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url) {
     if (entry == NULL) {
         char protocol[] = "tcp";
         int fd = pubsub_tcpHandler_open(handle, url);
+        rc = fd;
         struct sockaddr_in *sin = pubsub_utils_url_from_fd(fd);
         // Make handler fd entry
         char *pUrl = pubsub_utils_url_get_url(sin, protocol);
@@ -579,7 +565,6 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url) {
             free(pUrl);
             free(sin);
             celixThreadRwlock_writeLock(&handle->dbLock);
-            rc = fd;
             if (rc >= 0) {
                 rc = listen(fd, SOMAXCONN);
                 if (rc != 0) {
@@ -628,16 +613,24 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url) {
 }
 
 //
-// Setup buffer sizes
+// Setup receive buffer size
 //
-int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle,
-                                               unsigned int maxNofBuffers
-                                               __attribute__((__unused__)),
-                                               unsigned int bufferSize) {
+int pubsub_tcpHandler_setReceiveBufferSize(pubsub_tcpHandler_t *handle, unsigned int size) {
     if (handle != NULL) {
         celixThreadRwlock_writeLock(&handle->dbLock);
-        handle->bufferSize = bufferSize;
-        handle->maxNofBuffer = maxNofBuffers;
+        handle->bufferSize = size;
+        celixThreadRwlock_unlock(&handle->dbLock);
+    }
+    return 0;
+}
+
+//
+// Set Maximum message size
+//
+int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int size) {
+    if (handle != NULL) {
+        celixThreadRwlock_writeLock(&handle->dbLock);
+        handle->maxMsgSize = size;
         celixThreadRwlock_unlock(&handle->dbLock);
     }
     return 0;
@@ -708,7 +701,7 @@ void pubsub_tcpHandler_setThreadPriority(pubsub_tcpHandler_t *handle, long prio,
             if (prio > 0 && prio < 100) {
                 struct sched_param sch;
                 bzero(&sch, sizeof(struct sched_param));
-                sch.sched_priority = prio;
+                sch.sched_priority = (int)prio;
                 pthread_setschedparam(handle->thread.thread, policy, &sch);
             } else {
                 L_INFO("Skipping configuration of thread prio to %i and thread "
@@ -752,35 +745,23 @@ void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double tim
     }
 }
 
-static inline
-int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* _buffer, unsigned int offset, unsigned int size, int flag ) {
-    int expectedReadSize = size;
-    int nbytes = size;
-    int msgSize = 0;
-    char* buffer = (char*)_buffer;
-    while (nbytes > 0 && expectedReadSize > 0) {
-        // Read the message header
-        nbytes = recv(fd, &buffer[offset], expectedReadSize, flag | MSG_NOSIGNAL);
-        // Update buffer administration
-        offset += nbytes;
-        expectedReadSize -= nbytes;
-        msgSize += nbytes;
+void pubsub_tcpHandler_enableReceiveEvent(pubsub_tcpHandler_t *handle,bool enable) {
+    if (handle != NULL) {
+        celixThreadRwlock_writeLock(&handle->dbLock);
+        handle->enableReceiveEvent = enable;
+        celixThreadRwlock_unlock(&handle->dbLock);
     }
-    if (nbytes <=0)  msgSize = nbytes;
-    return msgSize;
 }
 
-
 static inline
 void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry) {
 
   if (entry->header.header.payloadSize > 0) {
-    handle->protocol->decodePayload(handle->protocol->handle, entry->buffer, entry->header.header.payloadSize, &entry->header);
+      handle->protocol->decodePayload(handle->protocol->handle, entry->buffer, entry->header.header.payloadSize, &entry->header);
   }
   if (entry->header.header.metadataSize > 0) {
-    handle->protocol->decodeMetadata(handle->protocol->handle, entry->metaBuffer,
-                                     entry->header.header.metadataSize, &entry->header);
-    entry->metaBufferSize = entry->header.header.metadataSize;
+      handle->protocol->decodeMetadata(handle->protocol->handle, entry->readMetaBuffer,
+                                       entry->header.header.metadataSize, &entry->header);
   }
   if (handle->processMessageCallback && entry->header.payload.payload != NULL && entry->header.payload.length) {
     struct timespec receiveTime;
@@ -791,13 +772,12 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec
   }
 }
 
-
 //
 // Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure
 // If the message is completely reassembled true is returned and the index and size have valid values
 //
 int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
-    celixThreadRwlock_writeLock(&handle->dbLock);
+    celixThreadRwlock_readLock(&handle->dbLock);
     psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) fd);
     if (entry == NULL)
         entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd);
@@ -811,125 +791,157 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
         celixThreadRwlock_unlock(&handle->dbLock);
         return -1;
     }
+    int nofBytesInReadBuffer = 0;
+    if (ioctl(fd, FIONREAD, &nofBytesInReadBuffer)) {
+        L_ERROR("[TCP Socket] socket: %d, url: %s, cannot  read nof bytes in socket read buffer \n", entry->fd, entry->url);
+    }
+    // if socket buffer is not filled, return out of function
+    if (!nofBytesInReadBuffer) {
+        celixThreadRwlock_unlock(&handle->dbLock);
+        return 1;
+    }
+    celixThreadMutex_lock(&entry->readMutex);
+    // When header is included in payload buffer, allocate buffer.
+    // bufferSize is at least the header size
+    if ((entry->buffer == NULL) && (entry->readHeaderBufferSize == 0)) {
+        entry->buffer  = malloc((size_t) handle->bufferSize);
+    }
+    struct msghdr msg;
+    struct iovec msg_iov[IOV_MAX];
+    memset(&msg, 0x00, sizeof(struct msghdr));
+    msg.msg_iov = msg_iov;
 
-    // Message buffer is to small, reallocate to make it bigger
-    if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize)) {
-        handle->bufferSize = MAX(handle->bufferSize, entry->headerSize );
-        if (entry->buffer) free(entry->buffer);
-            entry->buffer = malloc((size_t) handle->bufferSize);
-            entry->bufferSize = handle->bufferSize;
-        }
     // Read the message
-    bool validMsg = false;
-    char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer;
-    int nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, MSG_PEEK);
-    if (nbytes > 0) {
-        // Check header message buffer
-        if (handle->protocol->decodeHeader(handle->protocol->handle, header_buffer, entry->headerSize, &entry->header) != CELIX_SUCCESS) {
+    msg.msg_iovlen = 0;
+    msg.msg_iov[msg.msg_iovlen].iov_base = (entry->readHeaderBufferSize) ? entry->readHeaderBuffer : entry->buffer;
+    msg.msg_iov[msg.msg_iovlen].iov_len  = entry->headerSize;
+    msg.msg_iovlen++;
+    long int msgSize = 0;
+    long int nbytes = recvmsg(fd, &msg, MSG_PEEK | MSG_NOSIGNAL);
+    if (nbytes >= entry->headerSize) {
+        msg.msg_iovlen--;
+        if (handle->protocol->decodeHeader(handle->protocol->handle,
+                                           msg.msg_iov[msg.msg_iovlen].iov_base,
+                                           msg.msg_iov[msg.msg_iovlen].iov_len,
+                                           &entry->header) != CELIX_SUCCESS) {
             // Did not receive correct header
             // skip sync word and try to read next header
-            nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize, 0);
             if (!entry->headerError) {
                 L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url);
             }
             entry->headerError = true;
-            entry->bufferReadSize = 0;
+            msg.msg_iov[msg.msg_iovlen].iov_len = entry->syncSize;
+            msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+            msg.msg_iovlen++;
         } else {
-            // Read header message from queue
-            nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, 0);
-            if ((nbytes > 0) && (nbytes == entry->headerSize)) {
-                entry->headerError = false;
-                // For headerless message, add header to bufferReadSize;
-                if (!entry->headerBufferSize)
-                    entry->bufferReadSize += nbytes;
-                // Alloc message buffers
-                if (entry->header.header.payloadSize > entry->bufferSize) {
-                    handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize);
-                    if (entry->buffer)
-                        free(entry->buffer);
-                    entry->buffer = malloc((size_t) handle->bufferSize);
+            // Alloc message buffers
+            if (entry->header.header.payloadSize > entry->bufferSize) {
+                handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize);
+                char *buffer = realloc(entry->buffer, (size_t) handle->bufferSize);
+                if (buffer) {
+                    entry->buffer = buffer;
                     entry->bufferSize = handle->bufferSize;
                 }
-                if (entry->header.header.metadataSize > entry->metaBufferSize) {
-                    if (entry->metaBuffer) {
-                      free(entry->metaBuffer);
-                    }
-                    entry->metaBuffer = malloc((size_t) entry->header.header.metadataSize);
-                    entry->metaBufferSize = entry->header.header.metadataSize;
-                    L_WARN("[TCP Socket] socket: %d, url: %s,  realloc read meta buffer: (%d, %d) \n", entry->fd,
-                      entry->url, entry->metaBufferSize, entry->header.header.metadataSize);
-                }
+            }
 
-                if (entry->header.header.payloadSize) {
-                    unsigned int offset = entry->header.header.payloadOffset;
-                    unsigned int size = entry->header.header.payloadPartSize;
-                    // For header less messages adjust offset and msg size;
-                    if (!entry->headerBufferSize) {
-                        offset = entry->headerSize;
-                        size -= offset;
-                    }
-                    // Read payload data from queue
-                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, offset, size, 0);
-                    if (nbytes > 0) {
-                        if (nbytes == size) {
-                            entry->bufferReadSize += nbytes;
-                        } else {
-                            entry->bufferReadSize = 0;
-                            L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
-                        }
-                    }
-                }
-                if (nbytes > 0 && entry->header.header.metadataSize) {
-                    // Read meta data from queue
-                    unsigned int size = entry->header.header.metadataSize;
-                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0, size,0);
-                    if ((nbytes > 0) && (nbytes != size)) {
-                        L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
-                    }
-                }
-                // Check for end of message using, footer of message. Because of streaming protocol
-                if (nbytes > 0) {
-                    if (entry->footerSize > 0) {
-                        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer,0, entry->footerSize,0);
-                        if (handle->protocol->decodeFooter(handle->protocol->handle, entry->footerBuffer, entry->footerSize, &entry->header) == CELIX_SUCCESS) {
-                            // valid footer, this means that the message is valid
-                            validMsg = true;
-                        } else {
-                            // Did not receive correct header
-                            L_ERROR("[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url);
-                            entry->bufferReadSize = 0;
-                        }
-                    } else {
-                        // No Footer, then complete message is received
-                        validMsg = true;
+#ifdef USE_TCP_PUB_SUB_BUFFER_MEMSET
+            // For Debugging
+            if ((entry->header.header.payloadOffset == 0 ) && (msgSize == entry->headerSize)) {
+                memset(entry->buffer, 0x00, entry->bufferSize);
+            }
+#endif
+            entry->headerError = false;
+            if (entry->readHeaderBufferSize) {
+                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                msg.msg_iovlen++;
+            }
+            if (entry->header.header.payloadPartSize) {
+                char* buffer = entry->buffer;
+                msg.msg_iov[msg.msg_iovlen].iov_base = &buffer[entry->header.header.payloadOffset];
+                msg.msg_iov[msg.msg_iovlen].iov_len = entry->header.header.payloadPartSize;
+                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                msg.msg_iovlen++;
+            }
+            if (entry->header.header.metadataSize) {
+                if (entry->header.header.metadataSize > entry->readMetaBufferSize) {
+                    char *buffer = realloc(entry->readMetaBuffer, (size_t) entry->header.header.metadataSize);
+                    if (buffer) {
+                        entry->readMetaBuffer = buffer;
+                        entry->readMetaBufferSize = entry->header.header.metadataSize;
+                        L_WARN("[TCP Socket] socket: %d, url: %s,  realloc read meta buffer: (%d, %d) \n", entry->fd,
+                               entry->url, entry->readMetaBufferSize, entry->header.header.metadataSize);
                     }
                 }
+                msg.msg_iov[msg.msg_iovlen].iov_base = entry->readMetaBuffer;
+                msg.msg_iov[msg.msg_iovlen].iov_len  = entry->header.header.metadataSize;
+                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                msg.msg_iovlen++;
             }
+            if (entry->readFooterSize) {
+                msg.msg_iov[msg.msg_iovlen].iov_base = entry->readFooterBuffer;
+                msg.msg_iov[msg.msg_iovlen].iov_len = entry->readFooterSize;
+                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                msg.msg_iovlen++;
+            }
+        }
+        // If there is data in buffer, read the data. using blocking read
+        nbytes = recvmsg(fd, &msg, MSG_NOSIGNAL | MSG_WAITALL);
+        if (nbytes < msgSize) {
+            L_ERROR("[TCP Socket] socket: %d, url: %s, read nbytes %d != msgSize %d\n", entry->fd, entry->url, (int)nbytes, (int)msgSize);
         }
     }
-    if (nbytes > 0) {
-        entry->retryCount = 0;
-        // Check if complete message is received
-        if ((entry->bufferReadSize >= entry->header.header.payloadSize) && validMsg) {
-            entry->bufferReadSize = 0;
+    if ((nbytes >= msgSize)&&(!entry->headerError))  {
+        bool valid = true;
+        if (entry->readFooterSize) {
+            if (handle->protocol->decodeFooter(handle->protocol->handle,
+                                               entry->readFooterBuffer,
+                                               entry->readFooterSize,
+                                               &entry->header) != CELIX_SUCCESS) {
+
+                // Did not receive correct footer
+                L_ERROR(
+                    "[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)",
+                    entry->header.header.seqNr,
+                    entry->fd,
+                    entry->url);
+                valid = false;
+            }
+        }
+        if (!entry->header.header.isLastSegment) {
+            // Not last Segment of message
+            valid = false;
+        }
+
+        if (valid) {
+            // Complete message is received
             pubsub_tcpHandler_decodePayload(handle, entry);
         }
-    } else {
-        if (entry->retryCount < handle->maxRcvRetryCount) {
+    }
+
+    if (nbytes > 0) {
+        entry->retryCount = 0;
+    } else if (nbytes < 0) {
+        if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
+            // Non blocking interrupt
+            entry->retryCount = 0;
+        } else if (entry->retryCount < handle->maxRcvRetryCount) {
             entry->retryCount++;
-            L_WARN("[TCP Socket] Failed to receive message (fd: %d), error: %s. Retry count %u of %u,", entry->fd,
-                   strerror(errno), entry->retryCount, handle->maxRcvRetryCount);
+            L_WARN(
+                "[TCP Socket] Failed to receive message (fd: %d), try again. error(%d): %s, Retry count %u of %u.",
+                entry->fd, errno, strerror(errno), entry->retryCount, handle->maxSendRetryCount);
         } else {
             L_ERROR("[TCP Socket] Failed to receive message (fd: %d) after %u retries! Closing connection... Error: %s",
                     entry->fd, handle->maxRcvRetryCount, strerror(errno));
             nbytes = 0; //Return 0 as indicator to close the connection
         }
+    } else {
+        L_WARN("[TCP Socket] No message received (fd: %d), error(%d): %s, nbytes : %d", entry->fd, errno, strerror(errno), (int)nbytes);
     }
+    celixThreadMutex_unlock(&entry->readMutex);
     celixThreadRwlock_unlock(&handle->dbLock);
-    return nbytes;
+    return (int)nbytes;
 }
 
-
 int pubsub_tcpHandler_addMessageHandler(pubsub_tcpHandler_t *handle, void *payload,
                                         pubsub_tcpHandler_processMessage_callback_t processMessageCallback) {
     int result = 0;
@@ -964,58 +976,23 @@ int pubsub_tcpHandler_addAcceptConnectionCallback(pubsub_tcpHandler_t *handle, v
     return result;
 }
 
-static inline
-int pubsub_tcpHandler_writeSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, struct msghdr* msg, unsigned int size, int flag ) {
-  int nbytes = 0;
-  int msgSize = 0;
-  if (entry->fd >= 0 && size && msg->msg_iovlen) {
-    int expectedReadSize = size;
-    unsigned int offset = 0;
-    nbytes = size;
-    while (nbytes > 0 && expectedReadSize > 0) {
-      // Read the message header
-      nbytes = sendmsg(entry->fd, msg, flag | MSG_NOSIGNAL);
-      // Update admin
-      expectedReadSize -= nbytes;
-      msgSize += nbytes;
-      // Not all written
-      if (expectedReadSize && nbytes > 0) {
-        unsigned int readSize = 0;
-        unsigned int readIndex = 0;
-        unsigned int i = 0;
-        for (i = 0; i < msg->msg_iovlen; i++) {
-          if (nbytes < msg->msg_iov[i].iov_len) {
-            readIndex = i;
-            break;
-          }
-          readSize+= msg->msg_iov[i].iov_len;
-        }
-        msg->msg_iov = &msg->msg_iov[readIndex];
-        msg->msg_iovlen -= readIndex;
-        char* buffer = (char*)msg->msg_iov->iov_base;
-        offset = nbytes - readSize;
-        msg->msg_iov->iov_base = &buffer[offset];
-        msg->msg_iov->iov_len  = msg->msg_iov->iov_len - offset;
-      }
-    }
-  }
-  if (nbytes <=0)  msgSize = nbytes;
-  return msgSize;
-}
+
 //
 // Write large data to TCP. .
 //
 int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message_t *message, struct iovec *msgIoVec,
                             size_t msg_iov_len, int flags) {
-    celixThreadRwlock_readLock(&handle->dbLock);
     int result = 0;
     int connFdCloseQueue[hashMap_size(handle->connection_fd_map)];
     int nofConnToClose = 0;
     if (handle) {
+        celixThreadRwlock_readLock(&handle->dbLock);
         hash_map_iterator_t iter = hashMapIterator_construct(handle->connection_fd_map);
+        size_t max_msg_iov_len = IOV_MAX - 2; // header , footer, padding
         while (hashMapIterator_hasNext(&iter)) {
             psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (!entry->connected) continue;
+            celixThreadMutex_lock(&entry->writeMutex);
             void *payloadData = NULL;
             size_t payloadSize = 0;
             if (msg_iov_len == 1) {
@@ -1025,6 +1002,9 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                     payloadSize += msgIoVec[i].iov_len;
                 }
             }
+            // When maxMsgSize is zero then payloadSize is disabled
+            if (!entry->maxMsgSize) payloadSize = 0;
+
             message->header.convertEndianess = 0;
             message->header.payloadSize = payloadSize;
             message->header.payloadPartSize = payloadSize;
@@ -1034,130 +1014,174 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
             void *metadataData = NULL;
             size_t metadataSize = 0;
             if (message->metadata.metadata) {
-                metadataData = entry->metaBuffer;
+                metadataSize = entry->writeMetaBufferSize;
+                metadataData = entry->writeMetaBuffer;
                 handle->protocol->encodeMetadata(handle->protocol->handle, message,
                                                  &metadataData,
                                                  &metadataSize);
-                entry->metaBufferSize = metadataSize;
             }
+            // When maxMsgSize is smaller then meta data is disabled
+            if (!entry->maxMsgSize || (metadataSize > entry->maxMsgSize)) metadataSize = 0;
+
             message->header.metadataSize = metadataSize;
+            size_t totalMessageSize = payloadSize + metadataSize;
+
+            bool isMessageSegmentationSupported = false;
+            handle->protocol->isMessageSegmentationSupported(handle->protocol->handle, &isMessageSegmentationSupported);
+            if (((!isMessageSegmentationSupported) && (msg_iov_len > max_msg_iov_len)) ||
+                ((!isMessageSegmentationSupported) && (totalMessageSize > entry->maxMsgSize))) {
+                L_WARN("[TCP Socket] Failed to send message (fd: %d), Message segmentation is not supported\n",
+                       entry->fd);
+                celixThreadMutex_unlock(&entry->writeMutex);
+                continue;
+            }
 
             void *footerData = NULL;
             size_t footerDataSize = 0;
-            if (entry->footerSize) {
-                footerData = entry->footerBuffer;
+            if (entry->writeFooterSize) {
+                footerDataSize = entry->writeFooterSize;
+                footerData = entry->writeFooterBuffer;
                 handle->protocol->encodeFooter(handle->protocol->handle, message,
                                                  &footerData,
                                                  &footerDataSize);
-                entry->footerSize = footerDataSize;
+                entry->writeFooterSize = MAX(footerDataSize, entry->writeFooterSize);
+                if (footerData && entry->writeFooterBuffer != footerData) entry->writeFooterBuffer = footerData;
             }
 
             size_t msgSize = 0;
-            struct msghdr msg;
-            struct iovec msg_iov[IOV_MAX];
-            memset(&msg, 0x00, sizeof(struct msghdr));
-            msg.msg_name = &entry->addr;
-            msg.msg_namelen = entry->len;
-            msg.msg_flags = flags;
-            msg.msg_iov = msg_iov;
+            size_t msgPayloadSize = 0;
+            size_t msgMetaDataSize = 0;
+            size_t msgIovLen = 0;
+            long int nbytes = UINT32_MAX;
+            while (msgSize < totalMessageSize && nbytes > 0) {
+                struct msghdr msg;
+                struct iovec msg_iov[IOV_MAX];
+                memset(&msg, 0x00, sizeof(struct msghdr));
+                msg.msg_name = &entry->addr;
+                msg.msg_namelen = entry->len;
+                msg.msg_flags = flags;
+                msg.msg_iov = msg_iov;
+                size_t msgPartSize = 0;
+                message->header.payloadPartSize = 0;
+                message->header.payloadOffset = 0;
+                message->header.metadataSize = 0;
+                message->header.isLastSegment = 0;
+
+                // Write generic seralized payload in vector buffer
+                if (msgPayloadSize < payloadSize) {
+                    if (payloadSize && payloadData && entry->maxMsgSize) {
+                        char *buffer = payloadData;
+                        msg.msg_iovlen++;
+                        msg.msg_iov[msg.msg_iovlen].iov_base = &buffer[msgPayloadSize];
+                        msg.msg_iov[msg.msg_iovlen].iov_len = MIN((payloadSize - msgPayloadSize), entry->maxMsgSize);
+                        msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                        message->header.payloadPartSize = msgPartSize;
+                        message->header.payloadOffset   = msgPayloadSize;
+                        msgPayloadSize += message->header.payloadPartSize;
+                        msgSize = msgPayloadSize;
+                    } else {
+                        // copy serialized vector into vector buffer
+                        for (size_t i = 0; i < MIN(msg_iov_len, max_msg_iov_len); i++) {
+                            msg.msg_iovlen++;
+                            msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[msgIovLen + i].iov_base;
+                            msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[msgIovLen + i].iov_len;
+                            if ((msgPartSize + msg.msg_iov[msg.msg_iovlen].iov_len) > entry->maxMsgSize) {
+                                msg.msg_iovlen--;
+                                break;
+                            }
+                            msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                        }
+                        msgIovLen += msg.msg_iovlen;
+                        message->header.payloadOffset = msgPayloadSize;
+                        message->header.payloadPartSize = msgPartSize;
+                        msgPayloadSize  += message->header.payloadPartSize;
+                        msgSize = msgPayloadSize;
+                    }
+                }
 
-            // Write generic seralized payload in vector buffer
-            if (payloadSize && payloadData) {
-                msg.msg_iovlen++;
-                msg.msg_iov[msg.msg_iovlen].iov_base = payloadData;
-                msg.msg_iov[msg.msg_iovlen].iov_len = payloadSize;
-                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-            } else {
-                // copy serialized vector into vector buffer
-                for (size_t i = 0; i < MIN(msg_iov_len, IOV_MAX - 2); i++) {
+                // Write optional metadata in vector buffer
+                if ((msgPayloadSize >= payloadSize) &&
+                    (msgMetaDataSize < metadataSize) &&
+                    (msgPartSize < entry->maxMsgSize) &&
+                    (msg.msg_iovlen+1 < max_msg_iov_len-1) &&
+                    (metadataSize && metadataData)) {
                     msg.msg_iovlen++;
-                    msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[i].iov_base;
-                    msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[i].iov_len;
-                    msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                    msg.msg_iov[msg.msg_iovlen].iov_base = metadataData;
+                    msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize;
+                    msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                    message->header.metadataSize = metadataSize;
+                    msgSize += metadataSize;
+                }
+                if (msgSize >= totalMessageSize) {
+                    message->header.isLastSegment = 0x1;
                 }
-            }
-
-            // Write optional metadata in vector buffer
-            if (metadataSize && metadataData) {
-                msg.msg_iovlen++;
-                msg.msg_iov[msg.msg_iovlen].iov_base = metadataData;
-                msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize;
-                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-            }
 
-            // Write optional footerData in vector buffer
-            if (footerData && footerDataSize) {
-                msg.msg_iovlen++;
-                msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
-                msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize;
-                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-            }
+                // Write optional footerData in vector buffer
+                if (footerData && footerDataSize) {
+                    msg.msg_iovlen++;
+                    msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
+                    msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize;
+                    msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                }
 
-            void *headerData = NULL;
-            size_t headerSize = 0;
-            // check if header is not part of the payload (=> headerBufferSize = 0)s
-            if (entry->headerBufferSize) {
-              headerData = entry->headerBuffer;
-              // Encode the header, with payload size and metadata size
-              handle->protocol->encodeHeader(handle->protocol->handle, message,
-                                             &headerData,
-                                             &headerSize);
-              entry->headerBufferSize = headerSize;
-            }
-            if (!entry->headerBufferSize) {
-              // Skip header buffer, when header is part of payload;
-              msg.msg_iov = &msg_iov[1];
-            } else if (headerSize && headerData) {
-              // Write header in 1st vector buffer item
-                msg.msg_iov[0].iov_base = headerData;
-                msg.msg_iov[0].iov_len = headerSize;
-                msgSize += msg.msg_iov[0].iov_len;
-                msg.msg_iovlen++;
-            } else {
-              L_ERROR("[TCP Socket] No header buffer is generated");
-              msg.msg_iovlen = 0;
-            }
-            long int nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgSize, flags);
-            //  When a specific socket keeps reporting errors can indicate a subscriber
-            //  which is not active anymore, the connection will remain until the retry
-            //  counter exceeds the maximum retry count.
-            //  Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
-            if (nbytes == -1) {
-                if (entry->retryCount < handle->maxSendRetryCount) {
-                    entry->retryCount++;
-                    L_ERROR(
-                        "[TCP Socket] Failed to send message (fd: %d), error: %s. try again. Retry count %u of %u, ",
-                        entry->fd, strerror(errno), entry->retryCount, handle->maxSendRetryCount);
+                void *headerData = NULL;
+                size_t headerSize = entry->writeHeaderBufferSize;
+                // check if header is not part of the payload (=> headerBufferSize = 0)s
+                if (entry->writeHeaderBufferSize) {
+                    headerSize = entry->writeHeaderBufferSize;
+                    headerData = entry->writeHeaderBuffer;
+                    // Encode the header, with payload size and metadata size
+                    handle->protocol->encodeHeader(handle->protocol->handle, message,
+                                                   &headerData,
+                                                   &headerSize);
+                    entry->writeHeaderBufferSize = MAX(headerSize, entry->writeHeaderBufferSize);
+                    if (headerData && entry->writeHeaderBuffer != headerData) entry->writeHeaderBuffer = headerData;
+                }
+                if (!entry->writeHeaderBufferSize) {
+                    // Skip header buffer, when header is part of payload;
+                    msg.msg_iov = &msg_iov[1];
+                } else if (headerSize && headerData) {
+                    // Write header in 1st vector buffer item
+                    msg.msg_iov[0].iov_base = headerData;
+                    msg.msg_iov[0].iov_len = headerSize;
+                    msgPartSize += msg.msg_iov[0].iov_len;
+                    msg.msg_iovlen++;
                 } else {
-                    L_ERROR(
-                        "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s",
-                        entry->fd, handle->maxSendRetryCount, strerror(errno));
-                    connFdCloseQueue[nofConnToClose++] = entry->fd;
+                    L_ERROR("[TCP Socket] No header buffer is generated");
+                    msg.msg_iovlen = 0;
                 }
-                result = -1; //At least one connection failed sending
-            } else if (msgSize) {
-                entry->retryCount = 0;
-                if (nbytes != msgSize) {
-                    L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgSize, nbytes,  strerror(errno));
+                nbytes = sendmsg(entry->fd, &msg, flags | MSG_NOSIGNAL);
+                //  When a specific socket keeps reporting errors can indicate a subscriber
+                //  which is not active anymore, the connection will remain until the retry
+                //  counter exceeds the maximum retry count.
+                //  Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
+                if (nbytes == -1) {
+                    if (entry->retryCount < handle->maxSendRetryCount) {
+                        entry->retryCount++;
+                        L_ERROR(
+                            "[TCP Socket] Failed to send message (fd: %d), try again. Retry count %u of %u, error(%d): %s.",
+                            entry->fd, entry->retryCount, handle->maxSendRetryCount, errno, strerror(errno));
+                    } else {
+                        L_ERROR(
+                            "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s", entry->fd, handle->maxSendRetryCount, strerror(errno));
+                        connFdCloseQueue[nofConnToClose++] = entry->fd;
+                    }
+                    result = -1; //At least one connection failed sending
+                } else if (msgPartSize) {
+                    entry->retryCount = 0;
+                    if (nbytes != msgPartSize) {
+                        L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgSize, nbytes, strerror(errno));
+                    }
+                }
+                // Note: serialized Payload is deleted by serializer
+                if (payloadData && (payloadData != message->payload.payload)) {
+                    free(payloadData);
                 }
             }
-            // Release data
-            if (headerData && headerData != entry->headerBuffer) {
-                free(headerData);
-            }
-            // Note: serialized Payload is deleted by serializer
-            if (payloadData && (payloadData != message->payload.payload)) {
-                free(payloadData);
-            }
-            if (metadataData && metadataData != entry->metaBuffer) {
-                free(metadataData);
-            }
-            if (footerData && footerData != entry->footerBuffer) {
-                free(footerData);
-            }
+            celixThreadMutex_unlock(&entry->writeMutex);
         }
+        celixThreadRwlock_unlock(&handle->dbLock);
     }
-    celixThreadRwlock_unlock(&handle->dbLock);
     //Force close all connections that are queued in a list, done outside of locking handle->dbLock to prevent deadlock
     for (int i = 0; i < nofConnToClose; i++) {
         pubsub_tcpHandler_close(handle, connFdCloseQueue[i]);
@@ -1169,12 +1193,12 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
 // get interface URL
 //
 char *pubsub_tcpHandler_get_interface_url(pubsub_tcpHandler_t *handle) {
-    hash_map_iterator_t interface_iter =
+    hash_map_iterator_t iter =
         hashMapIterator_construct(handle->interface_url_map);
     char *url = NULL;
-    while (hashMapIterator_hasNext(&interface_iter)) {
+    while (hashMapIterator_hasNext(&iter)) {
         psa_tcp_connection_entry_t *entry =
-            hashMapIterator_nextValue(&interface_iter);
+            hashMapIterator_nextValue(&iter);
         if (entry && entry->url) {
             if (!url) {
                 url = celix_utils_strdup(entry->url);
@@ -1187,6 +1211,32 @@ char *pubsub_tcpHandler_get_interface_url(pubsub_tcpHandler_t *handle) {
     }
     return url;
 }
+//
+// get interface URL
+//
+char *pubsub_tcpHandler_get_connection_url(pubsub_tcpHandler_t *handle) {
+    hash_map_iterator_t iter =
+            hashMapIterator_construct(handle->connection_url_map);
+    char *url = NULL;
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_tcp_connection_entry_t *entry =
+                hashMapIterator_nextValue(&iter);
+        if (entry && entry->url) {
+            if (!url) {
+                pubsub_utils_url_t *url_info = pubsub_utils_url_parse(entry->url);
+                url = celix_utils_strdup(url_info->interface_url ? url_info->interface_url : entry->url);
+                pubsub_utils_url_free(url_info);
+            } else {
+                char *tmp = url;
+                pubsub_utils_url_t *url_info = pubsub_utils_url_parse(entry->url);
+                asprintf(&url, "%s %s", tmp, url_info->interface_url ? url_info->interface_url : entry->url);
+                pubsub_utils_url_free(url_info);
+                free(tmp);
+            }
+        }
+    }
+    return url;
+}
 
 //
 // Handle non-blocking accept (sender)
@@ -1216,7 +1266,8 @@ int pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t *handle, psa_tcp_connect
 #else
         struct epoll_event event;
         bzero(&event, sizeof(event)); // zero the struct
-        event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR;
+        event.events = EPOLLRDHUP | EPOLLERR;
+        if (handle->enableReceiveEvent) event.events |= EPOLLIN;
         event.data.fd = entry->fd;
         // Register Read to epoll
         rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
index ed4581c..2d97634 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
@@ -58,15 +58,14 @@ int pubsub_tcpHandler_close(pubsub_tcpHandler_t *handle, int fd);
 int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url);
 int pubsub_tcpHandler_disconnect(pubsub_tcpHandler_t *handle, char *url);
 int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url);
-
-int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle,
-                                               unsigned int maxNofBuffers,
-                                               unsigned int bufferSize);
+int pubsub_tcpHandler_setReceiveBufferSize(pubsub_tcpHandler_t *handle, unsigned int size);
+int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int size);
 void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle, unsigned int timeout);
 void pubsub_tcpHandler_setSendRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count);
 void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count);
 void pubsub_tcpHandler_setSendTimeOut(pubsub_tcpHandler_t *handle, double timeout);
 void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double timeout);
+void pubsub_tcpHandler_enableReceiveEvent(pubsub_tcpHandler_t *handle, bool enable);
 
 int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd);
 int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle,
@@ -86,6 +85,7 @@ int pubsub_tcpHandler_addAcceptConnectionCallback(pubsub_tcpHandler_t *handle,
                                                   pubsub_tcpHandler_acceptConnectMessage_callback_t connectMessageCallback,
                                                   pubsub_tcpHandler_acceptConnectMessage_callback_t disconnectMessageCallback);
 char *pubsub_tcpHandler_get_interface_url(pubsub_tcpHandler_t *handle);
+char *pubsub_tcpHandler_get_connection_url(pubsub_tcpHandler_t *handle);
 void pubsub_tcpHandler_setThreadPriority(pubsub_tcpHandler_t *handle, long prio, const char *sched);
 void pubsub_tcpHandler_setThreadName(pubsub_tcpHandler_t *handle, const char *topic, const char *scope);
 
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index 4fa4586..985af6b 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -24,20 +24,17 @@
 #include <pubsub/subscriber.h>
 #include <memory.h>
 #include <pubsub_constants.h>
-#include <assert.h>
-#include <pubsub_endpoint.h>
 #include <arpa/inet.h>
 #include <celix_log_helper.h>
-#include <math.h>
 #include "pubsub_tcp_handler.h"
 #include "pubsub_tcp_topic_receiver.h"
 #include "pubsub_psa_tcp_constants.h"
 #include "pubsub_tcp_common.h"
 
-#include "celix_utils_api.h"
 #include <uuid/uuid.h>
 #include <pubsub_admin_metrics.h>
 #include <pubsub_utils.h>
+#include "pubsub_interceptors_handler.h"
 #include <celix_api.h>
 
 #define MAX_EPOLL_EVENTS     16
@@ -65,8 +62,10 @@ struct pubsub_tcp_topic_receiver {
     char *topic;
     size_t timeout;
     bool metricsEnabled;
+    bool isPassive;
     pubsub_tcpHandler_t *socketHandler;
     pubsub_tcpHandler_t *sharedSocketHandler;
+    pubsub_interceptors_handler_t *interceptorsHandler;
 
     struct {
         celix_thread_t thread;
@@ -118,24 +117,14 @@ typedef struct psa_tcp_subscriber_entry {
     bool initialized; //true if the init function is called through the receive thread
 } psa_tcp_subscriber_entry_t;
 
-static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props,
-                                                  const celix_bundle_t *owner);
-
-static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props,
-                                                     const celix_bundle_t *owner);
-
+static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
+static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
 static void *psa_tcp_recvThread(void *data);
-
 static void psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t *receiver);
-
 static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiver);
-
 static void processMsg(void *handle, const pubsub_protocol_message_t *hdr, bool *release, struct timespec *receiveTime);
-
 static void psa_tcp_connectHandler(void *handle, const char *url, bool lock);
-
 static void psa_tcp_disConnectHandler(void *handle, const char *url, bool lock);
-
 static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor);
 
 pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context_t *ctx,
@@ -143,7 +132,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
                                                             const char *scope,
                                                             const char *topic,
                                                             const celix_properties_t *topicProperties,
-                                                            pubsub_tcp_endPointStore_t *endPointStore,
+                                                            pubsub_tcp_endPointStore_t *handlerStore,
                                                             long serializerSvcId,
                                                             pubsub_serializer_service_t *serializer,
                                                             long protocolSvcId,
@@ -157,52 +146,43 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
     receiver->protocol = protocol;
     receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
     receiver->topic = strndup(topic, 1024 * 1024);
-    bool isServerEndPoint = false;
-
-    /* Check if it's a static endpoint */
-    const char *staticClientEndPointUrls = NULL;
-    const char *staticServerEndPointUrls = NULL;
-    const char *staticConnectUrls = NULL;
-
-    staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_CONNECT_URLS_FOR, topic, scope);
+    pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler);
+    const char *staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_CONNECT_URLS_FOR, topic, scope);
+    const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope);
+    const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
 
     if (topicProperties != NULL) {
         if(staticConnectUrls == NULL) {
             staticConnectUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL);
         }
-        const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
-        if (endPointType != NULL) {
-            if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType,
-                        strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) {
-                staticClientEndPointUrls = staticConnectUrls;
-            }
-            if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType,
-                        strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) == 0) {
-                staticServerEndPointUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_BIND_URL, NULL);
-                isServerEndPoint = true;
-            }
+        if (isPassive == NULL) {
+            isPassive = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_CONFIGURED, NULL);
+        }
+        if (passiveKey == NULL) {
+            passiveKey = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_KEY, NULL);
         }
     }
+    receiver->isPassive = psa_tcp_isPassive(isPassive);
+
     // Set receiver connection thread timeout.
     // property is in ms, timeout value in us. (convert ms to us).
     receiver->timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT,
                                                               PSA_TCP_SUBSCRIBER_CONNECTION_DEFAULT_TIMEOUT) * 1000;
     /* When it's an endpoint share the socket with the sender */
-    if ((staticClientEndPointUrls != NULL) || (staticServerEndPointUrls)) {
-        celixThreadMutex_lock(&endPointStore->mutex);
-        const char *endPointUrl = (staticServerEndPointUrls) ? staticServerEndPointUrls : staticClientEndPointUrls;
-        pubsub_tcpHandler_t *entry = hashMap_get(endPointStore->map, endPointUrl);
+    if (passiveKey != NULL) {
+        celixThreadMutex_lock(&handlerStore->mutex);
+        pubsub_tcpHandler_t *entry = hashMap_get(handlerStore->map, passiveKey);
         if (entry == NULL) {
             if (receiver->socketHandler == NULL)
                 receiver->socketHandler = pubsub_tcpHandler_create(receiver->protocol, receiver->logHelper);
             entry = receiver->socketHandler;
             receiver->sharedSocketHandler = receiver->socketHandler;
-            hashMap_put(endPointStore->map, (void *) endPointUrl, entry);
+            hashMap_put(handlerStore->map, (void *) passiveKey, entry);
         } else {
             receiver->socketHandler = entry;
             receiver->sharedSocketHandler = entry;
         }
-        celixThreadMutex_unlock(&endPointStore->mutex);
+        celixThreadMutex_unlock(&handlerStore->mutex);
     } else {
         receiver->socketHandler = pubsub_tcpHandler_create(receiver->protocol, receiver->logHelper);
     }
@@ -213,14 +193,12 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
         long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_KEY,
                                                    PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_DEFAULT);
         double rcvTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT);
-        long sessions = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_MAX_RECV_SESSIONS,
-                                                              PSA_TCP_DEFAULT_MAX_RECV_SESSIONS);
-        long buffer_size = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE,
+        long bufferSize = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE,
                                                                  PSA_TCP_DEFAULT_RECV_BUFFER_SIZE);
         long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT);
+
         pubsub_tcpHandler_setThreadName(receiver->socketHandler, topic, scope);
-        pubsub_tcpHandler_createReceiveBufferStore(receiver->socketHandler, (unsigned int) sessions,
-                                                   (unsigned int) buffer_size);
+        pubsub_tcpHandler_setReceiveBufferSize(receiver->socketHandler, (unsigned int) bufferSize);
         pubsub_tcpHandler_setTimeout(receiver->socketHandler, (unsigned int) timeout);
         pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, receiver, processMsg);
         pubsub_tcpHandler_addReceiverConnectionCallback(receiver->socketHandler, receiver, psa_tcp_connectHandler,
@@ -230,8 +208,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
         pubsub_tcpHandler_setReceiveTimeOut(receiver->socketHandler, rcvTimeout);
     }
     receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
-                                                                     PSA_TCP_DEFAULT_METRICS_ENABLED);
-
+                                                                          PSA_TCP_DEFAULT_METRICS_ENABLED);
     celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
     celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
     celixThreadMutex_create(&receiver->thread.mutex, NULL);
@@ -239,7 +216,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
     receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
     receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
-    if ((staticConnectUrls != NULL) && (receiver->socketHandler != NULL) && (staticServerEndPointUrls == NULL)) {
+    if ((staticConnectUrls != NULL) && (receiver->socketHandler != NULL) && (!receiver->isPassive)) {
         char *urlsCopy = strndup(staticConnectUrls, 1024 * 1024);
         char *url;
         char *save = urlsCopy;
@@ -255,7 +232,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
         free(urlsCopy);
     }
 
-    if (receiver->socketHandler != NULL && (!isServerEndPoint)) {
+    if (receiver->socketHandler != NULL && (!receiver->isPassive)) {
         // Configure Receiver thread
         receiver->thread.running = true;
         celixThread_create(&receiver->thread.thread, NULL, psa_tcp_recvThread, receiver);
@@ -346,7 +323,7 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
             pubsub_tcpHandler_destroy(receiver->socketHandler);
             receiver->socketHandler = NULL;
         }
-
+        pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler);
         if (receiver->scope != NULL) {
             free(receiver->scope);
         }
@@ -374,6 +351,17 @@ long pubsub_tcpTopicReceiver_protocolSvcId(pubsub_tcp_topic_receiver_t *receiver
 void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiver, celix_array_list_t *connectedUrls,
                                              celix_array_list_t *unconnectedUrls) {
     celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    if (receiver->isPassive) {
+        char* interface_url = pubsub_tcpHandler_get_interface_url(receiver->socketHandler);
+        char *url = NULL;
+        asprintf(&url, "%s (passive)", interface_url ? interface_url : "");
+        if (interface_url) {
+            celix_arrayList_add(connectedUrls, url);
+        } else {
+            celix_arrayList_add(unconnectedUrls, url);
+        }
+        free(interface_url);
+    } else {
     hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
     while (hashMapIterator_hasNext(&iter)) {
         psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
@@ -383,11 +371,16 @@ void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiv
             celix_arrayList_add(connectedUrls, url);
         } else {
             celix_arrayList_add(unconnectedUrls, url);
+            }
         }
     }
     celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
 }
 
+bool pubsub_tcpTopicReceiver_isPassive(pubsub_tcp_topic_receiver_t *receiver) {
+    return receiver->isPassive;
+}
+
 void pubsub_tcpTopicReceiver_connectTo(
     pubsub_tcp_topic_receiver_t *receiver,
     const char *url) {
@@ -466,8 +459,7 @@ static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const
 
         hashMap_put(entry->subscriberServices, (void*)svcId, svc);
 
-        int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t *) bnd,
-                                                           &entry->msgTypes);
+        int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t *) bnd, &entry->msgTypes);
 
         if (rc == 0) {
             entry->metrics = hashMap_create(NULL, NULL, NULL, NULL);
@@ -498,7 +490,6 @@ static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, co
     long bndId = celix_bundle_getId(bnd);
     long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
 
-
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId);
     if (entry != NULL) {
@@ -559,31 +550,39 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
             }
 
             if (status == CELIX_SUCCESS) {
-                hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+                const char *msgType = msgSer->msgName;
+                uint32_t msgId = message->header.msgId;
+                celix_properties_t *metadata = message->metadata.metadata;
+                bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, &metadata);
                 bool release = true;
-                while (hashMapIterator_hasNext(&iter)) {
-                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
-                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata,
-                                 &release);
-                    if (!release && hashMapIterator_hasNext(&iter)) {
-                        //receive function has taken ownership and still more receive function to come ..
-                        //deserialize again for new message
-                        status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 1, &deSerializedMsg);
-                        if (status != CELIX_SUCCESS) {
-                            L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName,
-                                   receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
-                            break;
+                if (cont) {
+                    hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+                    while (hashMapIterator_hasNext(&iter)) {
+                        pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
+                        svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata, &release);
+                        pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata);
+                        if (!release && hashMapIterator_hasNext(&iter)) {
+                            //receive function has taken ownership and still more receive function to come ..
+                            //deserialize again for new message
+                            status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 1, &deSerializedMsg);
+                            if (status != CELIX_SUCCESS) {
+                                L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s",
+                                       msgSer->msgName,
+                                       receiver->scope == NULL ? "(null)" : receiver->scope,
+                                       receiver->topic);
+                                break;
+                            }
+                            release = true;
                         }
-                        release = true;
                     }
+                    if (release) {
+                        msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
+                    }
+                    if (message->metadata.metadata) {
+                        celix_properties_destroy(message->metadata.metadata);
+                    }
+                    updateReceiveCount += 1;
                 }
-                if (release) {
-                    msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
-                }
-                if (message->metadata.metadata) {
-                    celix_properties_destroy(message->metadata.metadata);
-                }
-                updateReceiveCount += 1;
             } else {
                 updateSerError += 1;
                 L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName,
@@ -650,10 +649,7 @@ static void *psa_tcp_recvThread(void *data) {
 
 pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topic_receiver_t *receiver) {
     pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
-    snprintf(result->scope,
-             PUBSUB_AMDIN_METRICS_NAME_MAX,
-             "%s",
-             receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope);
+    snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope);
     snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic);
 
     int msgTypesCount = 0;
@@ -677,8 +673,7 @@ pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topi
         hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
         while (hashMapIterator_hasNext(&iter2)) {
             hash_map_t *origins = hashMapIterator_nextValue(&iter2);
-            result->msgTypes[i].origins = calloc((size_t) hashMap_size(origins),
-                                                 sizeof(*(result->msgTypes[i].origins)));
+            result->msgTypes[i].origins = calloc((size_t) hashMap_size(origins), sizeof(*(result->msgTypes[i].origins)));
             result->msgTypes[i].nrOfOrigins = hashMap_size(origins);
             int k = 0;
             hash_map_iterator_t iter3 = hashMapIterator_construct(origins);
@@ -694,10 +689,8 @@ pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topi
                     result->msgTypes[i].origins[k].averageDelayInSeconds = metrics->averageDelayInSeconds;
                     result->msgTypes[i].origins[k].maxDelayInSeconds = metrics->maxDelayInSeconds;
                     result->msgTypes[i].origins[k].minDelayInSeconds = metrics->minDelayInSeconds;
-                    result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds =
-                        metrics->averageTimeBetweenMessagesInSeconds;
-                    result->msgTypes[i].origins[k].averageSerializationTimeInSeconds =
-                        metrics->averageSerializationTimeInSeconds;
+                    result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds = metrics->averageTimeBetweenMessagesInSeconds;
+                    result->msgTypes[i].origins[k].averageSerializationTimeInSeconds = metrics->averageSerializationTimeInSeconds;
                     result->msgTypes[i].origins[k].lastMessageReceived = metrics->lastMessageReceived;
                     result->msgTypes[i].origins[k].nrOfMissingSeqNumbers = metrics->nrOfMissingSeqNumbers;
 
@@ -707,7 +700,7 @@ pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topi
                            metrics->msgTypeId);
                 }
             }
-            i += 1;
+            i +=1 ;
         }
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -721,7 +714,7 @@ static void psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t
         hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
         while (hashMapIterator_hasNext(&iter)) {
             psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
-            if ((entry) && (!entry->connected)) {
+            if ((entry) && (!entry->connected) && (!receiver->isPassive)) {
                 int rc = pubsub_tcpHandler_connect(entry->parent->socketHandler, entry->url);
                 if (rc < 0) {
                     allConnected = false;
@@ -812,12 +805,11 @@ static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t
 
     int versionMajor;
     int versionMinor;
-    if (msgVersion != NULL) {
+    if (msgVersion!=NULL) {
         version_getMajor(msgVersion, &versionMajor);
         version_getMinor(msgVersion, &versionMinor);
-        if (major == ((unsigned char) versionMajor)) { /* Different major means incompatible */
-            check = (minor >=
-                ((unsigned char) versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
+        if (major==((unsigned char)versionMajor)) { /* Different major means incompatible */
+            check = (minor>=((unsigned char)versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
         }
     }
 
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h
index 50d5a97..118bf11 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h
@@ -32,7 +32,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
                                                             const char *scope,
                                                             const char *topic,
                                                             const celix_properties_t *topicProperties,
-                                                            pubsub_tcp_endPointStore_t *endPointStore,
+                                                            pubsub_tcp_endPointStore_t *handlerStore,
                                                             long serializerSvcId,
                                                             pubsub_serializer_service_t *serializer,
                                                             long protocolSvcId,
@@ -47,6 +47,7 @@ long pubsub_tcpTopicReceiver_protocolSvcId(pubsub_tcp_topic_receiver_t *receiver
 void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiver,
                                              celix_array_list_t *connectedUrls,
                                              celix_array_list_t *unconnectedUrls);
+bool pubsub_tcpTopicReceiver_isPassive(pubsub_tcp_topic_receiver_t *sender);
 
 void pubsub_tcpTopicReceiver_connectTo(pubsub_tcp_topic_receiver_t *receiver, const char *url);
 void pubsub_tcpTopicReceiver_disconnectFrom(pubsub_tcp_topic_receiver_t *receiver, const char *url);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index dbb5e26..105dcf1 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -35,8 +35,8 @@
 #include "celix_constants.h"
 #include <signal.h>
 #include <pubsub_utils.h>
+#include "pubsub_interceptors_handler.h"
 
-#define FIRST_SEND_DELAY_IN_SECONDS              2
 #define TCP_BIND_MAX_RETRY                      10
 
 #define L_DEBUG(...) \
@@ -59,13 +59,15 @@ struct pubsub_tcp_topic_sender {
     bool metricsEnabled;
     pubsub_tcpHandler_t *socketHandler;
     pubsub_tcpHandler_t *sharedSocketHandler;
+    pubsub_interceptors_handler_t *interceptorsHandler;
 
     char *scope;
     char *topic;
     char *url;
     bool isStatic;
-
+    bool isPassive;
     bool verbose;
+    unsigned long send_delay;
 
     struct {
         long svcId;
@@ -128,7 +130,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     const char *scope,
     const char *topic,
     const celix_properties_t *topicProperties,
-    pubsub_tcp_endPointStore_t *endPointStore,
+    pubsub_tcp_endPointStore_t *handlerStore,
     long serializerSvcId,
     pubsub_serializer_service_t *ser,
     long protocolSvcId,
@@ -144,52 +146,43 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     if (uuid != NULL) {
         uuid_parse(uuid, sender->fwUUID);
     }
-    sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
-                                                                   PSA_TCP_DEFAULT_METRICS_ENABLED);
-    bool isEndpoint = false;
+    pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+    sender->isPassive = false;
+    sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED);
     char *urls = NULL;
     const char *ip = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_IP_KEY, NULL);
-    const char *discUrl = NULL;
-    const char *staticClientEndPointUrls = NULL;
-    const char *staticServerEndPointUrls = NULL;
-
-    discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_BIND_URL_FOR, topic, scope);
+    const char *discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_BIND_URL_FOR, topic, scope);
+    const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope);
+    const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
 
     if (topicProperties != NULL) {
         if (discUrl == NULL) {
             discUrl = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_DISCOVER_URL, NULL);
         }
-        /* Check if it's a static endpoint */
-        const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
-        if (endPointType != NULL) {
-            isEndpoint = true;
-            if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType,
-                        strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) {
-                staticClientEndPointUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL);
-            }
-            if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType,
-                        strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) == 0) {
-                staticServerEndPointUrls = discUrl;
-            }
+        if (isPassive == NULL) {
+            isPassive = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_CONFIGURED, NULL);
+        }
+        if (passiveKey == NULL) {
+            passiveKey = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_KEY, NULL);
         }
     }
+    sender->isPassive = psa_tcp_isPassive(isPassive);
 
     /* When it's an endpoint share the socket with the receiver */
-    if ((staticClientEndPointUrls != NULL) || (staticServerEndPointUrls)) {
-        celixThreadMutex_lock(&endPointStore->mutex);
-        const char *endPointUrl = (staticClientEndPointUrls) ? staticClientEndPointUrls : staticServerEndPointUrls;
-        pubsub_tcpHandler_t *entry = hashMap_get(endPointStore->map, endPointUrl);
+    if (passiveKey != NULL) {
+        celixThreadMutex_lock(&handlerStore->mutex);
+        pubsub_tcpHandler_t *entry = hashMap_get(handlerStore->map, passiveKey);
         if (entry == NULL) {
             if (sender->socketHandler == NULL)
                 sender->socketHandler = pubsub_tcpHandler_create(sender->protocol, sender->logHelper);
             entry = sender->socketHandler;
             sender->sharedSocketHandler = sender->socketHandler;
-            hashMap_put(endPointStore->map, (void *) endPointUrl, entry);
+            hashMap_put(handlerStore->map, (void *) passiveKey, entry);
         } else {
             sender->socketHandler = entry;
             sender->sharedSocketHandler = entry;
         }
-        celixThreadMutex_unlock(&endPointStore->mutex);
+        celixThreadMutex_unlock(&handlerStore->mutex);
     } else {
         sender->socketHandler = pubsub_tcpHandler_create(sender->protocol, sender->logHelper);
     }
@@ -197,66 +190,64 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     if ((sender->socketHandler != NULL) && (topicProperties != NULL)) {
         long prio = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_THREAD_REALTIME_PRIO, -1L);
         const char *sched = celix_properties_get(topicProperties, PUBSUB_TCP_THREAD_REALTIME_SCHED, NULL);
-        long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY,
-                                                   PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
-        double timeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY,
-                                                                       (!isEndpoint) ? PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT :
-                                                                                       PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT);
+        long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY, PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
+        double sendTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY, PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT);
+        long maxMsgSize = celix_properties_getAsLong(topicProperties, PSA_TCP_MAX_MESSAGE_SIZE, PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE);
+        long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT);
+        sender->send_delay = celix_bundleContext_getPropertyAsLong(ctx,  PSA_TCP_SEND_DELAY, PSA_TCP_DEFAULT_SEND_DELAY);
         pubsub_tcpHandler_setThreadName(sender->socketHandler, topic, scope);
         pubsub_tcpHandler_setThreadPriority(sender->socketHandler, prio, sched);
         pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);
-        pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, timeout);
+        pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, sendTimeout);
+        pubsub_tcpHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) maxMsgSize);
+        pubsub_tcpHandler_enableReceiveEvent(sender->socketHandler, (passiveKey) ? true : false);
+        pubsub_tcpHandler_setTimeout(sender->socketHandler, (unsigned int) timeout);
     }
 
-    //setting up tcp socket for TCP TopicSender
-    if (staticClientEndPointUrls != NULL) {
-        // Store url for client static endpoint
-        sender->url = strndup(staticClientEndPointUrls, 1024 * 1024);
-        sender->isStatic = true;
-    } else if (discUrl != NULL) {
-        urls = strndup(discUrl, 1024 * 1024);
-        sender->isStatic = true;
-    } else if (ip != NULL) {
-        urls = strndup(ip, 1024 * 1024);
-    } else {
-        struct sockaddr_in *sin = pubsub_utils_url_getInAddr(NULL, 0);
-        urls = pubsub_utils_url_get_url(sin, NULL);
-        free(sin);
-    }
-    if (!sender->url) {
-        char *urlsCopy = strndup(urls, 1024 * 1024);
-        char *url;
-        char *save = urlsCopy;
-        while ((url = strtok_r(save, " ", &save))) {
-            int retry = 0;
-            while (url && retry < TCP_BIND_MAX_RETRY) {
-                pubsub_utils_url_t *urlInfo = pubsub_utils_url_parse(url);
-                int rc = pubsub_tcpHandler_listen(sender->socketHandler, urlInfo->url);
-                if (rc < 0) {
-                    L_WARN("Error for tcp_bind using dynamic bind url '%s'. %s", urlInfo->url, strerror(errno));
-                } else {
-                    url = NULL;
+    if (!sender->isPassive) {
+        //setting up tcp socket for TCP TopicSender
+        if (discUrl != NULL) {
+            urls = strndup(discUrl, 1024 * 1024);
+            sender->isStatic = true;
+        } else if (ip != NULL) {
+            urls = strndup(ip, 1024 * 1024);
+        } else {
+            struct sockaddr_in *sin = pubsub_utils_url_getInAddr(NULL, 0);
+            urls = pubsub_utils_url_get_url(sin, NULL);
+            free(sin);
+        }
+        if (!sender->url) {
+            char *urlsCopy = strndup(urls, 1024 * 1024);
+            char *url;
+            char *save = urlsCopy;
+            while ((url = strtok_r(save, " ", &save))) {
+                int retry = 0;
+                while (url && retry < TCP_BIND_MAX_RETRY) {
+                    pubsub_utils_url_t *urlInfo = pubsub_utils_url_parse(url);
+                    int rc = pubsub_tcpHandler_listen(sender->socketHandler, urlInfo->url);
+                    if (rc < 0) {
+                        L_WARN("Error for tcp_bind using dynamic bind url '%s'. %s", urlInfo->url, strerror(errno));
+                    } else {
+                        url = NULL;
+                    }
+                    pubsub_utils_url_free(urlInfo);
+                    retry++;
                 }
-                pubsub_utils_url_free(urlInfo);
-                retry++;
             }
+            free(urlsCopy);
+            sender->url = pubsub_tcpHandler_get_interface_url(sender->socketHandler);
         }
-        free(urlsCopy);
-        sender->url = pubsub_tcpHandler_get_interface_url(sender->socketHandler);
-    }
-    if (urls)
         free(urls);
+    }
 
-    if (sender->url != NULL) {
+    //register publisher services using a service factory
+    if ((sender->url != NULL) ||  (sender->isPassive)) {
         sender->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
         sender->topic = strndup(topic, 1024 * 1024);
 
         celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
         sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
-    }
 
-    //register publisher services using a service factory
-    if (sender->url != NULL) {
         sender->publisher.factory.handle = sender;
         sender->publisher.factory.getService = psa_tcp_getPublisherService;
         sender->publisher.factory.ungetService = psa_tcp_ungetPublisherService;
@@ -274,9 +265,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
         opts.properties = props;
 
         sender->publisher.svcId = celix_bundleContext_registerServiceWithOptions(ctx, &opts);
-    }
-
-    if (sender->url == NULL) {
+    } else {
         free(sender);
         sender = NULL;
     }
@@ -312,6 +301,7 @@ void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) {
         celixThreadMutex_unlock(&sender->boundedServices.mutex);
         celixThreadMutex_destroy(&sender->boundedServices.mutex);
 
+        pubsubInterceptorsHandler_destroy(sender->interceptorsHandler);
         if ((sender->socketHandler) && (sender->sharedSocketHandler == NULL)) {
             pubsub_tcpHandler_destroy(sender->socketHandler);
             sender->socketHandler = NULL;
@@ -343,13 +333,20 @@ const char *pubsub_tcpTopicSender_topic(pubsub_tcp_topic_sender_t *sender) {
 }
 
 const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender) {
-    return sender->url;
+    if (sender->isPassive) {
+        return pubsub_tcpHandler_get_connection_url(sender->socketHandler);
+    } else {
+        return sender->url;
+    }
 }
-
 bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender) {
     return sender->isStatic;
 }
 
+bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender) {
+    return sender->isPassive;
+}
+
 void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint) {
     //TODO subscriber count -> topic info
 }
@@ -483,8 +480,7 @@ pubsub_admin_sender_metrics_t *pubsub_tcpTopicSender_metrics(pubsub_tcp_topic_se
             result->msgMetrics[i].nrOfMessagesSendFailed = mEntry->metrics.nrOfMessagesSendFailed;
             result->msgMetrics[i].nrOfSerializationErrors = mEntry->metrics.nrOfSerializationErrors;
             result->msgMetrics[i].averageSerializationTimeInSeconds = mEntry->metrics.averageSerializationTimeInSeconds;
-            result->msgMetrics[i].averageTimeBetweenMessagesInSeconds =
-                mEntry->metrics.averageTimeBetweenMessagesInSeconds;
+            result->msgMetrics[i].averageTimeBetweenMessagesInSeconds = mEntry->metrics.averageTimeBetweenMessagesInSeconds;
             result->msgMetrics[i].lastMessageSend = mEntry->metrics.lastMessageSend;
             result->msgMetrics[i].bndId = entry->bndId;
             result->msgMetrics[i].typeId = mEntry->type;
@@ -533,7 +529,8 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
             clock_gettime(CLOCK_REALTIME, &serializationEnd);
         }
 
-        if (status == CELIX_SUCCESS /*ser ok*/) {
+        bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, &metadata);
+        if (status == CELIX_SUCCESS /*ser ok*/ && cont) {
             pubsub_protocol_message_t message;
             message.metadata.metadata = NULL;
             message.payload.payload = NULL;
@@ -555,12 +552,12 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
             entry->seqNr++;
             bool sendOk = true;
             {
-                int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput,
-                                                 serializedIoVecOutputLen, 0);
+                int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput, serializedIoVecOutputLen, 0);
                 if (rc < 0) {
                     status = -1;
                     sendOk = false;
                 }
+                pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, metadata);
                 if (message.metadata.metadata)
                     celix_properties_destroy(message.metadata.metadata);
                 if (serializedIoVecOutput) {
@@ -617,8 +614,8 @@ static void delay_first_send_for_late_joiners(pubsub_tcp_topic_sender_t *sender)
     static bool firstSend = true;
 
     if (firstSend) {
-        L_INFO("PSA_TCP_TP: Delaying first send for late joiners...\n");
-        sleep(FIRST_SEND_DELAY_IN_SECONDS);
+        if (sender->send_delay ) L_INFO("PSA_TCP_TP: Delaying first send for late joiners...\n");
+        usleep(sender->send_delay * 1000);
         firstSend = false;
     }
-}
+}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h
index 2217989..16438c4 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h
@@ -33,28 +33,21 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     const char *scope,
     const char *topic,
     const celix_properties_t *topicProperties,
-    pubsub_tcp_endPointStore_t *endPointStore,
+    pubsub_tcp_endPointStore_t *handlerStore,
     long serializerSvcId,
     pubsub_serializer_service_t *ser,
     long protocolSvcId,
     pubsub_protocol_service_t *prot);
 
 void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender);
-
 const char *pubsub_tcpTopicSender_scope(pubsub_tcp_topic_sender_t *sender);
-
 const char *pubsub_tcpTopicSender_topic(pubsub_tcp_topic_sender_t *sender);
-
 const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender);
-
 bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender);
-
+bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender);
 long pubsub_tcpTopicSender_serializerSvcId(pubsub_tcp_topic_sender_t *sender);
-
 long pubsub_tcpTopicSender_protocolSvcId(pubsub_tcp_topic_sender_t *sender);
-
 void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint);
-
 void pubsub_tcpTopicSender_disconnectFrom(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint);
 
 /**
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 4b4ad90..bbbcabf 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -39,7 +39,7 @@
 #include "pubsub_admin.h"
 #include "../../pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h"
 
-#define PSTM_PSA_HANDLING_DEFAULT_SLEEPTIME_IN_SECONDS       30L
+#define PSTM_PSA_HANDLING_DEFAULT_SLEEPTIME       250 // 250 msecond
 
 #ifndef UUID_STR_LEN
 #define UUID_STR_LEN    37
@@ -79,7 +79,7 @@ celix_status_t pubsub_topologyManager_create(celix_bundle_context_t *context, ce
 
     manager->loghelper = logHelper;
     manager->verbose = celix_bundleContext_getPropertyAsBool(context, PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE);
-    manager->handlingThreadSleepTime = celix_bundleContext_getPropertyAsLong(context, PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS_KEY, PSTM_PSA_HANDLING_DEFAULT_SLEEPTIME_IN_SECONDS);
+    manager->handlingThreadSleepTime = celix_bundleContext_getPropertyAsLong(context, PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_KEY, PSTM_PSA_HANDLING_DEFAULT_SLEEPTIME);
 
     manager->psaHandling.running = true;
     celixThread_create(&manager->psaHandling.thread, NULL, pstm_psaHandlingThread, manager);
@@ -1108,7 +1108,7 @@ static void *pstm_psaHandlingThread(void *data) {
         pstm_findPsaForEndpoints(manager); //trying to find psa and possible set for endpoints with no psa
 
         celixThreadMutex_lock(&manager->psaHandling.mutex);
-        celixThreadCondition_timedwaitRelative(&manager->psaHandling.cond, &manager->psaHandling.mutex, manager->handlingThreadSleepTime, 0L);
+        celixThreadCondition_timedwaitRelative(&manager->psaHandling.cond, &manager->psaHandling.mutex, manager->handlingThreadSleepTime  / 1000, (manager->handlingThreadSleepTime  % 1000) * 1000000);
         running = manager->psaHandling.running;
         celixThreadMutex_unlock(&manager->psaHandling.mutex);
     }
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index 842d861..4223433 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -31,7 +31,7 @@
 #include "pubsub/subscriber.h"
 
 #define PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY         "PUBSUB_TOPOLOGY_MANAGER_VERBOSE"
-#define PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS_KEY         "PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS"
+#define PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_KEY         "PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME"
 #define PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE     false
 
 
diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h b/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h
index 87d4263..b10863c 100644
--- a/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h
+++ b/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h
@@ -28,16 +28,16 @@ typedef struct pubsub_utils_url {
   char *url;
   char *protocol;
   char *hostname;
-  unsigned int portnr;
+  unsigned int port_nr;
   char *uri;
   char *interface;
-  unsigned int interface_portnr;
+  unsigned int interface_port_nr;
   char *interface_url;
 } pubsub_utils_url_t;
 
 struct sockaddr_in *pubsub_utils_url_from_fd(int fd);
-struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, int port);
-char *pubsub_utils_url_generate_url(char *hostname, unsigned int portnr, char *protocol);
+struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, unsigned int port);
+char *pubsub_utils_url_generate_url(char *hostname, unsigned int port_nr, char *protocol);
 char *pubsub_utils_url_get_url(struct sockaddr_in *inp, char *protocol);
 bool pubsub_utils_url_is_multicast(char *hostname);
 char *pubsub_utils_url_get_multicast_ip(char *hostname);
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c b/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c
index d8d518c..65a1ff2 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c
@@ -56,7 +56,7 @@ struct sockaddr_in *pubsub_utils_url_from_fd(int fd) {
     return inp;
 }
 
-struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, int port) {
+struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, unsigned int port) {
     struct hostent *hp;
     struct sockaddr_in *inp = malloc(sizeof(struct sockaddr_in));
     bzero(inp, sizeof(struct sockaddr_in)); // zero the struct
@@ -220,11 +220,11 @@ void pubsub_utils_url_parse_url(char *_url, pubsub_utils_url_t *url_info) {
                 maxPortnr += 1;
                 unsigned int minDigits = (unsigned int) atoi(portnr);
                 unsigned int maxDigits = (unsigned int) atoi(maxPortnr);
-                url_info->portnr = pubsub_utils_url_rand_range(minDigits, maxDigits);
+                url_info->port_nr = pubsub_utils_url_rand_range(minDigits, maxDigits);
             } else {
                 unsigned int portDigits = (unsigned int) atoi(portnr);
                 if (portDigits != 0)
-                    url_info->portnr = portDigits;
+                    url_info->port_nr = portDigits;
                 uri = strstr(port, "/");
                 if ((uri) && (!url_info->uri))
                     url_info->uri = celix_utils_strdup(uri);
@@ -256,11 +256,11 @@ void pubsub_utils_url_parse_url(char *_url, pubsub_utils_url_t *url_info) {
                 maxPortnr += 1;
                 unsigned int minDigits = (unsigned int) atoi(portnr);
                 unsigned int maxDigits = (unsigned int) atoi(maxPortnr);
-                url_info->interface_portnr = pubsub_utils_url_rand_range(minDigits, maxDigits);
+                url_info->interface_port_nr = pubsub_utils_url_rand_range(minDigits, maxDigits);
             } else {
                 unsigned int portDigits = (unsigned int) atoi(portnr);
                 if (portDigits != 0)
-                    url_info->interface_portnr = portDigits;
+                    url_info->interface_port_nr = portDigits;
                 uri = strstr(port, "/");
                 if ((uri) && (!url_info->uri))
                     url_info->uri = celix_utils_strdup(uri);
@@ -289,13 +289,13 @@ pubsub_utils_url_t *pubsub_utils_url_parse(char *url) {
             free(url_info->interface);
             url_info->interface = ip;
         }
-        struct sockaddr_in *m_sin = pubsub_utils_url_getInAddr(url_info->interface, url_info->interface_portnr);
+        struct sockaddr_in *m_sin = pubsub_utils_url_getInAddr(url_info->interface, url_info->interface_port_nr);
         url_info->interface_url = pubsub_utils_url_get_url(m_sin, NULL);
         free(m_sin);
         pubsub_utils_url_parse_url(url_info->interface_url, &interface_url_info);
         free(url_info->interface);
         url_info->interface = interface_url_info.hostname;
-        url_info->interface_portnr = interface_url_info.portnr;
+        url_info->interface_port_nr = interface_url_info.port_nr;
     }
 
     if (url_info->hostname) {
@@ -306,11 +306,11 @@ pubsub_utils_url_t *pubsub_utils_url_parse(char *url) {
             free(url_info->hostname);
             url_info->hostname = ip;
         }
-        struct sockaddr_in *sin = pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr);
+        struct sockaddr_in *sin = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr);
         url_info->url = pubsub_utils_url_get_url(sin, url_info->protocol);
         free(url_info->hostname);
         free(sin);
-        url_info->portnr = 0;
+        url_info->port_nr = 0;
         url_info->hostname = NULL;
         pubsub_utils_url_parse_url(url_info->url, url_info);
     }
@@ -338,7 +338,7 @@ void pubsub_utils_url_free(pubsub_utils_url_t *url_info) {
     url_info->hostname = NULL;
     url_info->protocol = NULL;
     url_info->interface = NULL;
-    url_info->portnr = 0;
-    url_info->interface_portnr = 0;
+    url_info->port_nr = 0;
+    url_info->interface_port_nr = 0;
     free(url_info);
 }
diff --git a/bundles/pubsub/test/CMakeLists.txt b/bundles/pubsub/test/CMakeLists.txt
index a78958a..5c936ce 100644
--- a/bundles/pubsub/test/CMakeLists.txt
+++ b/bundles/pubsub/test/CMakeLists.txt
@@ -38,7 +38,7 @@ celix_bundle_files(pubsub_endpoint_sut
 add_celix_bundle(pubsub_endpoint_tst
         #Test bundle containing cpputests and uses celix_test_runner launcher instead of the celix launcher
         SOURCES
-        test/tst_activator.c
+        test/tst_endpoint_activator.c
         VERSION 1.0.0
         )
 target_link_libraries(pubsub_endpoint_tst PRIVATE Celix::framework Celix::pubsub_api)
@@ -47,7 +47,7 @@ celix_bundle_files(pubsub_endpoint_tst
         DESTINATION "META-INF/descriptors"
         )
 celix_bundle_files(pubsub_endpoint_tst
-        meta_data/ping2.properties
+        meta_data/ping3.properties
         DESTINATION "META-INF/topics/sub"
         )
 
@@ -65,7 +65,7 @@ celix_bundle_files(pubsub_loopback
         DESTINATION "META-INF/descriptors"
         )
 celix_bundle_files(pubsub_loopback
-        meta_data/pong2.properties
+        meta_data/pong3.properties
         DESTINATION "META-INF/topics/pub"
         )
 celix_bundle_files(pubsub_loopback
@@ -153,9 +153,9 @@ if (BUILD_PUBSUB_PSA_UDP_MC)
             LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
             BUNDLES
             Celix::pubsub_serializer_json
+            Celix::pubsub_protocol_wire_v2
             Celix::pubsub_topology_manager
             Celix::pubsub_admin_zmq
-            Celix::pubsub_protocol_wire_v2
             Celix::shell
             Celix::shell_tui
             )
@@ -189,9 +189,9 @@ if (BUILD_PUBSUB_PSA_TCP)
             Celix::shell
             Celix::shell_tui
             Celix::pubsub_serializer_json
+            Celix::pubsub_protocol_wire_v2
             Celix::pubsub_topology_manager
             Celix::pubsub_admin_tcp
-            Celix::pubsub_protocol_wire_v2
             pubsub_sut
             pubsub_tst
             )
@@ -211,12 +211,12 @@ if (BUILD_PUBSUB_PSA_TCP)
             Celix::shell
             Celix::shell_tui
             Celix::pubsub_serializer_json
+            Celix::pubsub_protocol_wire_v2
             Celix::pubsub_topology_manager
             Celix::pubsub_admin_tcp
-            Celix::pubsub_protocol_wire_v1
             pubsub_loopback
-            pubsub_endpoint_sut
             pubsub_endpoint_tst
+            pubsub_endpoint_sut
             )
     target_link_libraries(pubsub_tcp_endpoint_tests PRIVATE Celix::pubsub_api ${CppUTest_LIBRARIES} Jansson Celix::dfi)
     target_include_directories(pubsub_tcp_endpoint_tests SYSTEM PRIVATE ${CppUTest_INCLUDE_DIR} test)
@@ -229,9 +229,9 @@ if (BUILD_PUBSUB_PSA_TCP)
             LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
             BUNDLES
             Celix::pubsub_serializer_json
+            Celix::pubsub_protocol_wire_v2
             Celix::pubsub_topology_manager
             Celix::pubsub_admin_zmq
-            Celix::pubsub_protocol_wire_v2
             Celix::shell
             Celix::shell_tui
             )
@@ -286,9 +286,9 @@ if (BUILD_PUBSUB_PSA_WS)
             LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
             BUNDLES
             Celix::pubsub_serializer_json
+            Celix::pubsub_protocol_wire_v2
             Celix::pubsub_topology_manager
             Celix::pubsub_admin_zmq
-            Celix::pubsub_protocol_wire_v2
             Celix::shell
             Celix::shell_tui
             )
@@ -319,9 +319,9 @@ if (BUILD_PUBSUB_PSA_ZMQ)
                 LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
             BUNDLES
                 Celix::pubsub_serializer_json
+                Celix::pubsub_protocol_wire_v1
                 Celix::pubsub_topology_manager
                 Celix::pubsub_admin_zmq
-                Celix::pubsub_protocol_wire_v1
                 pubsub_sut
                 pubsub_tst
     )
@@ -339,9 +339,9 @@ if (BUILD_PUBSUB_PSA_ZMQ)
         LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
         BUNDLES
         Celix::pubsub_serializer_json
+        Celix::pubsub_protocol_wire_v2
         Celix::pubsub_topology_manager
         Celix::pubsub_admin_zmq
-        Celix::pubsub_protocol_wire_v2
         pubsub_sut
         pubsub_tst
         )
@@ -361,9 +361,9 @@ if (BUILD_PUBSUB_PSA_ZMQ)
                 PSA_ZMQ_ZEROCOPY_ENABLED=true
             BUNDLES
                 Celix::pubsub_serializer_json
+                Celix::pubsub_protocol_wire_v1
                 Celix::pubsub_topology_manager
                 Celix::pubsub_admin_zmq
-                Celix::pubsub_protocol_wire_v1
                 Celix::shell
                 Celix::shell_tui
                 pubsub_sut
@@ -384,9 +384,9 @@ if (BUILD_PUBSUB_PSA_ZMQ)
         PSA_ZMQ_ZEROCOPY_ENABLED=true
         BUNDLES
         Celix::pubsub_serializer_json
+        Celix::pubsub_protocol_wire_v2
         Celix::pubsub_topology_manager
         Celix::pubsub_admin_zmq
-        Celix::pubsub_protocol_wire_v2
         Celix::shell
         Celix::shell_tui
         pubsub_sut
@@ -406,9 +406,9 @@ if (BUILD_PUBSUB_PSA_ZMQ)
             LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
             BUNDLES
             Celix::pubsub_serializer_json
+            Celix::pubsub_protocol_wire_v2
             Celix::pubsub_topology_manager
             Celix::pubsub_admin_zmq
-            Celix::pubsub_protocol_wire_v2
             Celix::shell
             Celix::shell_tui
             )
diff --git a/bundles/pubsub/test/meta_data/ping2.properties b/bundles/pubsub/test/meta_data/ping2.properties
index 4b42836..ff0dbed 100644
--- a/bundles/pubsub/test/meta_data/ping2.properties
+++ b/bundles/pubsub/test/meta_data/ping2.properties
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 tcp.static.bind.url=tcp://localhost:9500
-tcp.static.endpoint.type=server
+tcp.passive.key=tcp://localhost:9500
 
 #note only effective if run as root
 thread.realtime.sched=SCHED_FIFO
diff --git a/bundles/pubsub/test/meta_data/pong2.properties b/bundles/pubsub/test/meta_data/pong2.properties
index fa55718..b95f3bc 100644
--- a/bundles/pubsub/test/meta_data/pong2.properties
+++ b/bundles/pubsub/test/meta_data/pong2.properties
@@ -14,8 +14,8 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-tcp.static.connect.urls=tcp://localhost:9500;localhost:9501
-tcp.static.endpoint.type=client
+tcp.static.connect.urls=tcp://localhost:9500
+tcp.passive.key=tcp://localhost
 
 #note only effective if run as root
 thread.realtime.sched=SCHED_FIFO
diff --git a/bundles/pubsub/test/test/loopback_activator.c b/bundles/pubsub/test/test/loopback_activator.c
index 43d8a78..c208743 100644
--- a/bundles/pubsub/test/test/loopback_activator.c
+++ b/bundles/pubsub/test/test/loopback_activator.c
@@ -42,7 +42,7 @@ struct activator {
 celix_status_t bnd_start(struct activator *act, celix_bundle_context_t *ctx) {
 
 	char filter[512];
-	snprintf(filter, 512, "(%s=%s)", PUBSUB_PUBLISHER_TOPIC, "pong2");
+	snprintf(filter, 512, "(%s=%s)", PUBSUB_PUBLISHER_TOPIC, "pong3");
 	celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
 	opts.set = sut_pubSet;
 	opts.callbackHandle = act;
@@ -86,6 +86,7 @@ static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId
   msg_t *msg = voidMsg;
   msg_t send_msg = *msg;
   pthread_mutex_lock(&act->mutex);
+
   if (act->pubSvc != NULL) {
     if (act->count == 0) {
       act->pubSvc->localMsgTypeIdForMsgType(act->pubSvc->handle, MSG_NAME, &act->msgId);
diff --git a/bundles/pubsub/test/test/tst_endpoint_activator.c b/bundles/pubsub/test/test/tst_endpoint_activator.c
index 636b9a6..81e0326 100644
--- a/bundles/pubsub/test/test/tst_endpoint_activator.c
+++ b/bundles/pubsub/test/test/tst_endpoint_activator.c
@@ -48,7 +48,7 @@ celix_status_t bnd_start(struct activator *act, celix_bundle_context_t *ctx) {
 
     {
         celix_properties_t *props = celix_properties_create();
-        celix_properties_set(props, PUBSUB_SUBSCRIBER_TOPIC, "ping2");
+        celix_properties_set(props, PUBSUB_SUBSCRIBER_TOPIC, "ping3");
         act->subSvc.handle = act;
         act->subSvc.receive = tst_receive;
         act->subSvcId = celix_bundleContext_registerService(ctx, &act->subSvc, PUBSUB_SUBSCRIBER_SERVICE_NAME, props);