You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rb...@apache.org on 2020/06/05 13:03:06 UTC

[celix] branch feature/make_tcp_admin_msg_sending_robust_when_tcp_send_timeout_expires created (now 9cdf93b)

This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a change to branch feature/make_tcp_admin_msg_sending_robust_when_tcp_send_timeout_expires
in repository https://gitbox.apache.org/repos/asf/celix.git.


      at 9cdf93b  refactor read and write function

This branch includes the following new commits:

     new 9cdf93b  refactor read and write function

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[celix] 01/01: refactor read and write function

Posted by rb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a commit to branch feature/make_tcp_admin_msg_sending_robust_when_tcp_send_timeout_expires
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 9cdf93bf575aa14323ee4ec45fae9b1d02c18492
Author: Roy Bulter <ro...@gmail.com>
AuthorDate: Fri Jun 5 15:02:05 2020 +0200

    refactor read and write function
---
 .../src/pubsub_psa_tcp_constants.h                 |   2 +
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.c      | 368 +++++++++++----------
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.h      |   3 +-
 .../src/pubsub_tcp_topic_receiver.c                |   9 +-
 .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c |   5 +-
 5 files changed, 208 insertions(+), 179 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
index 3e7a7b3..6026212 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
@@ -61,9 +61,11 @@
 //Time-out settings are only for BLOCKING connections
 #define PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY       "PUBSUB_TCP_PUBLISHER_SEND_TIMEOUT"
 #define PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT   5.0
+#define PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT   0.0
 
 #define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY      "PUBSUB_TCP_SUBSCRIBER_RCV_TIMEOUT"
 #define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT  5.0
+#define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_ENDPOINT_DEFAULT  0.0
 
 #define PUBSUB_TCP_PSA_IP_KEY                   "PSA_IP"
 #define PUBSUB_TCP_ADMIN_TYPE                   "tcp"
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 93f0358..7b13b6f 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -76,25 +76,28 @@ typedef struct psa_tcp_connection_entry {
     struct sockaddr_in addr;
     socklen_t len;
     bool connected;
+    bool headerError;
     pubsub_protocol_message_t header;
     unsigned int syncSize;
     unsigned int headerSize;
     unsigned int headerBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload
     void *headerBuffer;
+    void *footerBuffer;
     unsigned int bufferSize;
     void *buffer;
+    unsigned int bufferReadSize;
     unsigned int metaBufferSize;
     void *metaBuffer;
     struct msghdr msg;
     size_t msg_iovlen;        /* Number of elements in the vector.  */
     unsigned int retryCount;
+    unsigned int seqNr;
 } psa_tcp_connection_entry_t;
 
 //
 // Handle administration
 //
 struct pubsub_tcpHandler {
-    unsigned int readSeqNr;
     celix_thread_rwlock_t dbLock;
     unsigned int timeout;
     hash_map_t *connection_url_map;
@@ -137,9 +140,9 @@ static inline void pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry
 
 static inline void pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *handle, int fd, unsigned int index);
 
-static inline void pubsub_tcpHandler_readHandler(pubsub_tcpHandler_t *handle, int fd);
+static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* buffer, unsigned int offset, unsigned int size, int flag );
 
-static inline void pubsub_tcpHandler_readHandler(pubsub_tcpHandler_t *handle, int fd);
+static inline void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry);
 
 static inline void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd);
 
@@ -337,6 +340,7 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
             entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->headerSize;
             entry->msg_iovlen++;
         }
+        entry->footerBuffer = calloc(sizeof(char), entry->headerSize);
         entry->buffer = calloc(sizeof(char), entry->bufferSize);
         entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->buffer;
         entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->bufferSize;
@@ -378,6 +382,12 @@ pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry) {
             entry->headerBuffer = NULL;
             entry->headerBufferSize = 0;
         }
+
+        if (entry->footerBuffer) {
+            free(entry->footerBuffer);
+            entry->footerBuffer = NULL;
+        }
+
         if (entry->metaBuffer) {
             free(entry->metaBuffer);
             entry->metaBuffer = NULL;
@@ -438,7 +448,7 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
 #else
             struct epoll_event event;
             bzero(&event,  sizeof(struct epoll_event)); // zero the struct
-            event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLOUT;
+            event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR;
             event.data.fd = entry->fd;
             rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event);
 #endif
@@ -520,7 +530,7 @@ pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle,
                                       psa_tcp_connection_entry_t *entry) {
     int rc = 0;
     if (handle != NULL && entry != NULL) {
-        fprintf(stdout, "[TCP Socket] Close interface url: %s: \n", entry->url);
+        L_INFO("[TCP Socket] Close interface url: %s: \n", entry->url);
         hashMap_remove(handle->interface_fd_map, (void *) (intptr_t) entry->fd);
         if ((handle->efd >= 0)) {
 #if defined(__APPLE__)
@@ -546,8 +556,7 @@ pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle,
 //
 // Make accept file descriptor non blocking
 //
-static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle,
-                                                    int fd) {
+static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int fd) {
     int rc = 0;
     int flags = fcntl(fd, F_GETFL, 0);
     if (flags == -1)
@@ -714,13 +723,9 @@ void pubsub_tcpHandler_setThreadPriority(pubsub_tcpHandler_t *handle, long prio,
                 sch.sched_priority = prio;
                 pthread_setschedparam(handle->thread.thread, policy, &sch);
             } else {
-                printf("Skipping configuration of thread prio to %i and thread "
+                L_INFO("Skipping configuration of thread prio to %i and thread "
                        "scheduling to %s. No permission\n",
                        (int) prio, sched);
-                celix_logHelper_log(handle->logHelper, CELIX_LOG_LEVEL_INFO,
-                              "Skipping configuration of thread prio to %i and thread "
-                              "scheduling to %s. No permission\n",
-                              (int) prio, sched);
             }
             celixThreadRwlock_unlock(&handle->dbLock);
         }
@@ -759,14 +764,51 @@ void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double tim
     }
 }
 
+static inline
+int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* _buffer, unsigned int offset, unsigned int size, int flag ) {
+    int expectedReadSize = size;
+    int nbytes = size;
+    int msgSize = 0;
+    char* buffer = (char*)_buffer;
+    while (nbytes > 0 && expectedReadSize > 0) {
+        // Read the message header
+        nbytes = recv(fd, &buffer[offset], expectedReadSize, flag | MSG_NOSIGNAL);
+        // Update buffer administration
+        offset += nbytes;
+        expectedReadSize -= nbytes;
+        msgSize += nbytes;
+    }
+    if (nbytes <=0)  msgSize = nbytes;
+    return msgSize;
+}
+
+
+static inline
+void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry) {
+
+  if (entry->header.header.payloadSize > 0) {
+    handle->protocol->decodePayload(handle->protocol->handle, entry->buffer, entry->header.header.payloadSize, &entry->header);
+  }
+  if (entry->header.header.metadataSize > 0) {
+    handle->protocol->decodeMetadata(handle->protocol->handle, entry->metaBuffer,
+                                     entry->header.header.metadataSize, &entry->header);
+  }
+  if (handle->processMessageCallback && entry->header.payload.payload != NULL && entry->header.payload.length) {
+    struct timespec receiveTime;
+    clock_gettime(CLOCK_REALTIME, &receiveTime);
+    bool releaseEntryBuffer = false;
+    handle->processMessageCallback(handle->processMessagePayload, &entry->header, &releaseEntryBuffer, &receiveTime);
+    if (releaseEntryBuffer) pubsub_tcpHandler_releaseEntryBuffer(handle, entry->fd, 0);
+  }
+}
+
+
 //
 // Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure
 // If the message is completely reassembled true is returned and the index and size have valid values
 //
-int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigned int *index, bool *readMsg) {
+int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
     celixThreadRwlock_writeLock(&handle->dbLock);
-    *index = 0;
-    *readMsg = false;
     psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) fd);
     if (entry == NULL)
         entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd);
@@ -784,93 +826,105 @@ int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigne
     // Message buffer is to small, reallocate to make it bigger
     if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize)) {
         handle->bufferSize = MAX(handle->bufferSize, entry->headerSize);
-        char *buffer = realloc(entry->buffer, (size_t) handle->bufferSize);
-        if (buffer) {
-            entry->buffer = buffer;
+        if (entry->buffer) free(entry->buffer);
+            entry->buffer = malloc((size_t) handle->bufferSize);
             entry->bufferSize = handle->bufferSize;
         }
-    }
-
     // Read the message
-    entry->msg.msg_iovlen = 0;
-    entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = (entry->headerBufferSize) ? entry->headerBuffer
-                                                                                   : entry->buffer;
-    entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->headerSize;
-    entry->msg.msg_iovlen++;
-    int nbytes = recvmsg(fd, &entry->msg, MSG_PEEK | MSG_NOSIGNAL);
+    bool validMsg = false;
+    char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer;
+    int nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, MSG_PEEK);
     if (nbytes > 0) {
-        entry->msg.msg_iovlen = 0;
-        if (entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len != nbytes) {
-            celixThreadRwlock_unlock(&handle->dbLock);
-            return nbytes;
-
-        } else if (handle->protocol->decodeHeader(handle->protocol->handle,
-                                                  entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base,
-                                                  entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len, &entry->header) !=
-            CELIX_SUCCESS) {
-            entry->msg.msg_iov[0].iov_len = entry->syncSize;
-            nbytes = recvmsg(fd, &entry->msg, 0);
-            if (nbytes > 0)
-                entry->retryCount = 0;
-            celixThreadRwlock_unlock(&handle->dbLock);
-            return nbytes;
-        }
-        if (entry->header.header.payloadSize > entry->bufferSize) {
-            handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize);
-            char *buffer = realloc(entry->buffer, (size_t) handle->bufferSize);
-            if (buffer) {
-                entry->buffer = buffer;
-                entry->bufferSize = handle->bufferSize;
-            }
-        }
-        if (entry->header.header.metadataSize > entry->metaBufferSize) {
-            char *buffer = realloc(entry->metaBuffer, (size_t) entry->header.header.metadataSize);
-            if (buffer) {
-                entry->metaBuffer = buffer;
-                entry->metaBufferSize = entry->header.header.metadataSize;
-                L_WARN("[TCP Socket] socket: %d, url: %s,  realloc read meta buffer: (%d, %d) \n", entry->fd,
-                       entry->url, entry->metaBufferSize, entry->header.header.metadataSize);
-            }
-        }
-
-        if (entry->headerBufferSize)
-            entry->msg.msg_iovlen++;
-        if (entry->header.header.payloadSize) {
-            entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->buffer;
-            entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->header.header.payloadSize;
-            entry->msg.msg_iovlen++;
-        }
-        if (entry->header.header.metadataSize) {
-            entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->metaBuffer;
-            entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->header.header.metadataSize;
-            entry->msg.msg_iovlen++;
-        }
-        nbytes = recvmsg(fd, &entry->msg, MSG_WAITALL | MSG_NOSIGNAL);
-    } else {
-        if (entry->retryCount < handle->maxRcvRetryCount) {
-            entry->retryCount++;
-            L_WARN("[TCP Socket] Failed to receive message header (fd: %d), error: %s. Retry count %u of %u,",
-                   entry->fd, strerror(errno), entry->retryCount, handle->maxRcvRetryCount);
+        // Check header message buffer
+        if (handle->protocol->decodeHeader(handle->protocol->handle,
+                                           header_buffer,
+                                           entry->headerSize,
+                                           &entry->header) != CELIX_SUCCESS) {
+            // Did not receive correct header
+            // skip sync word and try to read next header
+            nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize, 0);
+            if (!entry->headerError) L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url);
+            entry->headerError = true;
+            entry->bufferReadSize = 0;
         } else {
-            L_ERROR(
-                "[TCP Socket] Failed to receive message header (fd: %d) after %u retries! Closing connection... Error: %s",
-                entry->fd,
-                handle->maxRcvRetryCount,
-                strerror(errno));
-            nbytes = 0; //Return 0 as indicator to close the connection
+            // Read header message from queue
+            nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, 0);
+            if ((nbytes > 0) && (nbytes == entry->headerSize)) {
+                entry->headerError = false;
+                // For headerless message, add header to bufferReadSize;
+                if (!entry->headerBufferSize)
+                    entry->bufferReadSize += nbytes;
+                // Alloc message buffers
+                if (entry->header.header.payloadSize > entry->bufferSize) {
+                    handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize);
+                    if (entry->buffer)
+                        free(entry->buffer);
+                    entry->buffer = malloc((size_t) handle->bufferSize);
+                    entry->bufferSize = handle->bufferSize;
+                }
+                if (entry->header.header.metadataSize > entry->metaBufferSize) {
+                    if (entry->metaBuffer) {
+                        free(entry->metaBuffer);
+                        entry->metaBuffer = malloc((size_t) entry->header.header.metadataSize);
+                        entry->bufferSize = handle->bufferSize;
+                        L_WARN("[TCP Socket] socket: %d, url: %s,  realloc read meta buffer: (%d, %d) \n", entry->fd,
+                               entry->url, entry->metaBufferSize, entry->header.header.metadataSize);
+                    }
+                }
+
+                if (entry->header.header.payloadSize) {
+                    unsigned int offset = entry->header.header.payloadOffset;
+                    unsigned int size = entry->header.header.payloadPartSize;
+                    // For header less messages adjust offset and msg size;
+                    if (!entry->headerBufferSize) {
+                        offset = entry->headerSize;
+                        size -= offset;
+                    }
+                    // Read payload data from queue
+                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, offset, size, 0);
+                    if (nbytes > 0) {
+                        if (nbytes == size) {
+                            entry->bufferReadSize += nbytes;
+                        } else {
+                            entry->bufferReadSize = 0;
+                            L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
+                        }
+                    }
+                }
+                if (nbytes > 0 && entry->header.header.metadataSize) {
+                    // Read meta data from queue
+                    unsigned int size = entry->header.header.metadataSize;
+                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0, size,0);
+                    if ((nbytes > 0) && (nbytes != size)) {
+                        L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
+                    }
+                }
+                // Check for end of message using, header of next message. Because of streaming protocol
+                // TODO: Add to protocol service to decode/EncodeFooter with unique sync word(different then header)
+                if (nbytes > 0) {
+                    pubsub_protocol_message_t header;
+                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer, 0, entry->headerSize, MSG_PEEK);
+                    if (handle->protocol->decodeHeader(handle->protocol->handle,
+                                                 entry->footerBuffer,
+                                                 entry->headerSize,
+                                                 &header) == CELIX_SUCCESS) {
+                        // valid header for next buffer, this means that the message is valid
+                        validMsg = true;
+                    } else {
+                        // Did not receive correct header
+                        L_ERROR("[TCP Socket] Failed to decode next message header seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url);
+                        entry->bufferReadSize = 0;
+                    }
+                }
+            }
         }
     }
     if (nbytes > 0) {
         entry->retryCount = 0;
-        unsigned int msgSize = 0;
-        for (int i = 0; i < entry->msg.msg_iovlen; i++) {
-            msgSize += entry->msg.msg_iov[i].iov_len;
-        }
-        if (nbytes == msgSize) {
-            *readMsg = true;
-        } else {
-            L_ERROR("[TCP Socket] Failed to receive complete message (fd: %d) nbytes : %d = msgSize %d", entry->fd,
-                    nbytes, msgSize);
+        // Check if complete message is received
+        if ((entry->bufferReadSize >= entry->header.header.payloadSize) && validMsg) {
+            entry->bufferReadSize = 0;
+            pubsub_tcpHandler_decodePayload(handle, entry);
         }
     } else {
         if (entry->retryCount < handle->maxRcvRetryCount) {
@@ -887,34 +941,6 @@ int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigne
     return nbytes;
 }
 
-//
-// Read out the message which is indicated available by the largeUdp_dataAvailable function
-//
-int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd, unsigned int index __attribute__ ((__unused__)),
-                           pubsub_protocol_message_t **header) {
-    int result = 0;
-    celixThreadRwlock_readLock(&handle->dbLock);
-    psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) fd);;
-    if (entry == NULL)
-        entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd);
-    if (entry == NULL)
-        result = -1;
-    if (entry)
-        result = (!entry->connected) ? -1 : result;
-    if (!result) {
-        if (entry->header.header.payloadSize > 0) {
-            handle->protocol->decodePayload(handle->protocol->handle, entry->buffer, entry->header.header.payloadSize,
-                                            &entry->header);
-        }
-        if (entry->header.header.metadataSize > 0) {
-            handle->protocol->decodeMetadata(handle->protocol->handle, entry->metaBuffer,
-                                             entry->header.header.metadataSize, &entry->header);
-        }
-        *header = &entry->header;
-    }
-    celixThreadRwlock_unlock(&handle->dbLock);
-    return result;
-}
 
 int pubsub_tcpHandler_addMessageHandler(pubsub_tcpHandler_t *handle, void *payload,
                                         pubsub_tcpHandler_processMessage_callback_t processMessageCallback) {
@@ -950,6 +976,44 @@ int pubsub_tcpHandler_addAcceptConnectionCallback(pubsub_tcpHandler_t *handle, v
     return result;
 }
 
+static inline
+int pubsub_tcpHandler_writeSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, struct msghdr* msg, unsigned int size, int flag ) {
+  int nbytes = 0;
+  int msgSize = 0;
+  if (entry->fd >= 0 && size && msg->msg_iovlen) {
+    int expectedReadSize = size;
+    unsigned int offset = 0;
+    nbytes = size;
+    while (nbytes > 0 && expectedReadSize > 0) {
+      // Read the message header
+      nbytes = sendmsg(entry->fd, msg, flag | MSG_NOSIGNAL);
+      // Update admin
+      expectedReadSize -= nbytes;
+      msgSize += nbytes;
+      // Not all written
+      if (expectedReadSize && nbytes > 0) {
+        unsigned int readSize = 0;
+        unsigned int readIndex = 0;
+        unsigned int i = 0;
+        for (i = 0; i < msg->msg_iovlen; i++) {
+          if (nbytes < msg->msg_iov[i].iov_len) {
+            readIndex = i;
+            break;
+          }
+          readSize+= msg->msg_iov[i].iov_len;
+        }
+        msg->msg_iov = &msg->msg_iov[readIndex];
+        msg->msg_iovlen -= readIndex;
+        char* buffer = (char*)msg->msg_iov->iov_base;
+        offset = nbytes - readSize;
+        msg->msg_iov->iov_base = &buffer[offset];
+        msg->msg_iov->iov_len  = msg->msg_iov->iov_len - offset;
+      }
+    }
+  }
+  if (nbytes <=0)  msgSize = nbytes;
+  return msgSize;
+}
 //
 // Write large data to TCP. .
 //
@@ -963,6 +1027,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
         hash_map_iterator_t iter = hashMapIterator_construct(handle->connection_fd_map);
         while (hashMapIterator_hasNext(&iter)) {
             psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+            if (!entry->connected) continue;
             void *payloadData = NULL;
             size_t payloadSize = 0;
             if (msg_iov_len == 1) {
@@ -973,6 +1038,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                 }
             }
 
+            message->header.seqNr = entry->seqNr;
             message->header.payloadSize = payloadSize;
             message->header.payloadPartSize = payloadSize;
             message->header.payloadOffset = 0;
@@ -987,15 +1053,12 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
             message->header.metadataSize = metadataSize;
 
             size_t msgSize = 0;
-            long int nbytes = 0;
             struct msghdr msg;
             struct iovec msg_iov[IOV_MAX];
+            memset(&msg, 0x00, sizeof(struct msghdr));
             msg.msg_name = &entry->addr;
             msg.msg_namelen = entry->len;
             msg.msg_flags = flags;
-            msg.msg_control = NULL;
-            msg.msg_controllen = 0;
-            msg.msg_iovlen = 0;
             msg.msg_iov = msg_iov;
 
             // Write generic seralized payload in vector buffer
@@ -1044,10 +1107,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
               L_ERROR("[TCP Socket] No header buffer is generated");
               msg.msg_iovlen = 0;
             }
-            nbytes = 0;
-            if (entry->fd >= 0 && msgSize && msg.msg_iovlen) {
-                nbytes = sendmsg(entry->fd, &msg, flags | MSG_NOSIGNAL);
-            }
+            long int nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgSize, flags);
             //  When a specific socket keeps reporting errors can indicate a subscriber
             //  which is not active anymore, the connection will remain until the retry
             //  counter exceeds the maximum retry count.
@@ -1068,7 +1128,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
             } else if (msgSize) {
                 entry->retryCount = 0;
                 if (nbytes != msgSize) {
-                    L_ERROR("[TCP Socket]  MsgSize not correct: %d != %d\n", msgSize, nbytes);
+                    L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", entry->seqNr, msgSize, nbytes,  strerror(errno));
                 }
             }
             // Release data
@@ -1082,6 +1142,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
             if (metadataData) {
                 free(metadataData);
             }
+            entry->seqNr++;
         }
     }
     celixThreadRwlock_unlock(&handle->dbLock);
@@ -1168,51 +1229,10 @@ int pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t *handle, psa_tcp_connect
 }
 
 //
-// Handle sockets reads (blocking)
-//
-static inline
-void pubsub_tcpHandler_readHandler(pubsub_tcpHandler_t *handle, int fd) {
-    unsigned int index = 0;
-    bool readMsg = false;
-    int rc = pubsub_tcpHandler_dataAvailable(handle, fd, &index, &readMsg);
-    if (rc <= 0) {
-        // close connection.
-        if (rc == 0)
-            pubsub_tcpHandler_close(handle, fd);
-        return;
-    }
-    if (readMsg) {
-        // Handle data
-        pubsub_protocol_message_t *header = NULL;
-        rc = pubsub_tcpHandler_read(handle, fd, index, &header);
-        if (rc < 0)
-            return;
-        celixThreadRwlock_readLock(&handle->dbLock);
-        if (handle->processMessageCallback && header != NULL && header->payload.payload != NULL &&
-            header->payload.length) {
-            struct timespec receiveTime;
-            clock_gettime(CLOCK_REALTIME, &receiveTime);
-            bool releaseEntryBuffer = false;
-            handle->processMessageCallback(handle->processMessagePayload, header, &releaseEntryBuffer, &receiveTime);
-            if (releaseEntryBuffer)
-                pubsub_tcpHandler_releaseEntryBuffer(handle, fd, index);
-        }
-        celixThreadRwlock_unlock(&handle->dbLock);
-    }
-}
-
-//
 // Handle sockets connection (sender)
 //
 static inline
 void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd) {
-    int err = 0;
-    socklen_t len = sizeof(int);
-    int rc = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
-    if (rc != 0) {
-        L_ERROR("[TCP Socket]:EPOLLOUT ERROR read from socket %s\n", strerror(errno));
-        return;
-    }
     celixThreadRwlock_readLock(&handle->dbLock);
     psa_tcp_connection_entry_t *entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd);
     if (entry)
@@ -1255,7 +1275,8 @@ void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
         int fd = pubsub_tcpHandler_acceptHandler(handle, pendingConnectionEntry);
         pubsub_tcpHandler_connectionHandler(handle, fd);
       } else if (events[i].filter & EVFILT_READ) {
-        pubsub_tcpHandler_readHandler(handle, events[i].ident);
+        int rc = pubsub_tcpHandler_read(handle, events[i].data.fd);
+        if (rc == 0) pubsub_tcpHandler_close(handle, events[i].data.fd);
       } else if (events[i].flags & EV_EOF) {
         int err = 0;
         socklen_t len = sizeof(int);
@@ -1304,7 +1325,8 @@ void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
                int fd = pubsub_tcpHandler_acceptHandler(handle, pendingConnectionEntry);
                pubsub_tcpHandler_connectionHandler(handle, fd);
             } else if (events[i].events & EPOLLIN) {
-                pubsub_tcpHandler_readHandler(handle, events[i].data.fd);
+                rc = pubsub_tcpHandler_read(handle, events[i].data.fd);
+                if (rc == 0) pubsub_tcpHandler_close(handle, events[i].data.fd);
             } else if (events[i].events & EPOLLRDHUP) {
                 int err = 0;
                 socklen_t len = sizeof(int);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
index 260edc1..ed4581c 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
@@ -68,8 +68,7 @@ void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, unsigned
 void pubsub_tcpHandler_setSendTimeOut(pubsub_tcpHandler_t *handle, double timeout);
 void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double timeout);
 
-int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigned int *index, bool *readMsg);
-int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd, unsigned int index, pubsub_protocol_message_t **header);
+int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd);
 int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle,
                             pubsub_protocol_message_t *message,
                             struct iovec *msg_iovec,
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index 8cbf8fc..0bd51c5 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -157,6 +157,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
     receiver->protocol = protocol;
     receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
     receiver->topic = strndup(topic, 1024 * 1024);
+    bool isEndpoint = false;
     bool isServerEndPoint = false;
 
     /* Check if it's a static endpoint */
@@ -167,6 +168,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
         staticConnectUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL);
         const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
         if (endPointType != NULL) {
+            isEndpoint = true;
             if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType,
                         strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) {
                 staticClientEndPointUrls = staticConnectUrls;
@@ -207,8 +209,9 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
         const char *sched = celix_properties_get(topicProperties, PUBSUB_TCP_THREAD_REALTIME_SCHED, NULL);
         long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_KEY,
                                                    PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_DEFAULT);
-        double rcvTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY,
-                                                         PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT);
+        double rcvTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY, 
+                                                        (!isEndpoint) ? PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT :
+                                                                        PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_ENDPOINT_DEFAULT);
         long sessions = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_MAX_RECV_SESSIONS,
                                                               PSA_TCP_DEFAULT_MAX_RECV_SESSIONS);
         long buffer_size = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE,
@@ -789,4 +792,4 @@ static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t
     }
 
     return check;
-}
\ No newline at end of file
+}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index 69c862a..47dc888 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -145,6 +145,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     }
     sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
                                                                    PSA_TCP_DEFAULT_METRICS_ENABLED);
+    bool isEndpoint = false;
     char *urls = NULL;
     const char *ip = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_IP_KEY, NULL);
     const char *discUrl = NULL;
@@ -155,6 +156,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
         /* Check if it's a static endpoint */
         const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
         if (endPointType != NULL) {
+            isEndpoint = true;
             if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType,
                         strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) {
                 staticClientEndPointUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL);
@@ -192,7 +194,8 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
         long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY,
                                                    PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
         double timeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY,
-                                                      PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT);
+                                                                       (!isEndpoint) ? PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT :
+                                                                                       PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT);
         pubsub_tcpHandler_setThreadName(sender->socketHandler, topic, scope);
         pubsub_tcpHandler_setThreadPriority(sender->socketHandler, prio, sched);
         pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);