You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2020/02/21 11:01:20 UTC

[celix] branch develop updated: Feature/tcp admin improvements (#154)

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/develop by this push:
     new 99c7487  Feature/tcp admin improvements (#154)
99c7487 is described below

commit 99c7487b02d2097fd4efed2c16de8bbc669b6b33
Author: dhbfischer <52...@users.noreply.github.com>
AuthorDate: Fri Feb 21 12:01:13 2020 +0100

    Feature/tcp admin improvements (#154)
    
    * Improved send and receive handling for TCP admin
    
    * Undo unnecessary print
    
    * Reset recv failure retry counter
---
 .../src/pubsub_psa_tcp_constants.h                 |  13 ++
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.c      | 159 +++++++++++++++------
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.h      |   4 +
 .../src/pubsub_tcp_topic_receiver.c                |   4 +
 .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c |   6 +-
 5 files changed, 142 insertions(+), 44 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
index d0c0ebd..7eb1171 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
@@ -53,9 +53,22 @@
 #define PUBSUB_TCP_PUBLISHER_BLOCKING_KEY       "PUBSUB_TCP_PUBLISHER_BLOCKING"
 #define PUBSUB_TCP_PUBLISHER_BLOCKING_DEFAULT   true
 
+#define PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY      "PUBSUB_TCP_PUBLISHER_RETRY_COUNT"
+#define PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT  5
+
+#define PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_KEY     "PUBSUB_TCP_SUBSCRIBER_RETRY_COUNT"
+#define PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_DEFAULT 5
+
 #define PUBSUB_TCP_SUBSCRIBER_BLOCKING_KEY      "PUBSUB_TCP_SUBSCRIBER_BLOCKING"
 #define PUBSUB_TCP_SUBSCRIBER_BLOCKING_DEFAULT   true
 
+//Time-out settings are only for BLOCKING connections
+#define PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY       "PUBSUB_TCP_PUBLISHER_SEND_TIMEOUT"
+#define PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT   5.0
+
+#define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY      "PUBSUB_TCP_SUBSCRIBER_RCV_TIMEOUT"
+#define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT  5.0
+
 #define PUBSUB_TCP_PSA_IP_KEY                   "PSA_IP"
 #define PUBSUB_TCP_PSA_ITF_KEY                  "PSA_INTERFACE"
 
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index cfe3e31..1cbcadf 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -78,6 +78,7 @@ typedef struct psa_tcp_connection_entry {
     int expectedReadSize;
     int readState;
     pubsub_tcp_msg_header_t header;
+    unsigned int retryCount;
 
 } psa_tcp_connection_entry_t;
 
@@ -101,6 +102,10 @@ struct pubsub_tcpHandler {
   log_helper_t *logHelper;
   unsigned int bufferSize;
   unsigned int maxNofBuffer;
+  unsigned int maxSendRetryCount;
+  unsigned int maxRcvRetryCount;
+  double sendTimeout;
+  double rcvTimeout;
   psa_tcp_connection_entry_t own;
 };
 
@@ -130,6 +135,8 @@ pubsub_tcpHandler_t *pubsub_tcpHandler_create(log_helper_t *logHelper) {
         handle->bufferSize = MAX_DEFAULT_BUFFER_SIZE;
         handle->maxNofBuffer = 1; // Reserved for future Use;
         handle->useBlockingWrite = true;
+        handle->sendTimeout = 0.0;
+        handle->rcvTimeout = 0.0;
         pubsub_tcpHandler_setupEntry(&handle->own, -1, NULL, MAX_DEFAULT_BUFFER_SIZE);
         celixThreadRwlock_create(&handle->dbLock, 0);
         //signal(SIGPIPE, SIG_IGN);
@@ -142,7 +149,7 @@ pubsub_tcpHandler_t *pubsub_tcpHandler_create(log_helper_t *logHelper) {
 // Destroys the handle
 //
 void pubsub_tcpHandler_destroy(pubsub_tcpHandler_t *handle) {
-    printf("### Destroying BufferHandler TCP\n");
+    L_INFO("### Destroying BufferHandler TCP");
     if (handle != NULL) {
         celixThreadRwlock_writeLock(&handle->dbLock);
         pubsub_tcpHandler_close(handle);
@@ -188,6 +195,24 @@ int pubsub_tcpHandler_open(pubsub_tcpHandler_t *handle, char *url) {
         } else {
             L_ERROR("[TCP Socket] Error creating socket: %s\n", strerror(errno));
         }
+        if (rc == 0 && handle->sendTimeout != 0.0) {
+            struct timeval tv;
+            tv.tv_sec = (long int) handle->sendTimeout;
+            tv.tv_usec = (long int) ((handle->sendTimeout - tv.tv_sec) * 1000000.0);
+            rc = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
+            if(rc != 0) {
+                L_ERROR("[TCP Socket] Error setsockopt (SO_SNDTIMEO) to set send timeout: %s", strerror(errno));
+            }
+        }
+        if (rc == 0 && handle->rcvTimeout != 0.0) {
+            struct timeval tv;
+            tv.tv_sec = (long int) handle->rcvTimeout;
+            tv.tv_usec = (long int) ((handle->rcvTimeout - tv.tv_sec) * 1000000.0);
+            rc = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
+            if(rc != 0) {
+                L_ERROR("[TCP Socket] Error setsockopt (SO_RCVTIMEO) to set send timeout: %s", strerror(errno));
+            }
+        }
         struct sockaddr_in addr; // connector's address information
         pubsub_tcpHandler_url_t url_info;
         pubsub_tcpHandler_setUrlInfo(url, &url_info);
@@ -284,7 +309,6 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
                 L_ERROR("[TCP Socket] Cannot connect to %s:%d: err: %s\n", url_info.hostname, url_info.portnr,
                         strerror(errno));
                 close(fd);
-                errno = 0;
             } else {
                 struct sockaddr_in sin;
                 socklen_t len = sizeof(sin);
@@ -294,7 +318,6 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
                 rc = getsockname(fd, (struct sockaddr *) &sin, &len);
                 if (rc < 0) {
                     L_ERROR("[TCP Socket] getsockname %s\n", strerror(errno));
-                    errno = 0;
                 } else if (handle->own.url == NULL) {
                     char *address = inet_ntoa(sin.sin_addr);
                     unsigned int port = ntohs(sin.sin_port);
@@ -313,7 +336,6 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
                 pubsub_tcpHandler_freeEntry(entry);
                 free(entry);
                 L_ERROR("[TCP Socket] Cannot create epoll %s\n", strerror(errno));
-                errno = 0;
                 entry = NULL;
             }
         }
@@ -335,7 +357,11 @@ int pubsub_tcpHandler_disconnect(pubsub_tcpHandler_t *handle, char *url) {
     if (handle != NULL) {
         celixThreadRwlock_writeLock(&handle->dbLock);
         psa_tcp_connection_entry_t *entry = hashMap_remove(handle->url_map, url);
-        pubsub_tcpHandler_closeConnectionEntry(handle, entry, false);
+        if (entry != NULL) {
+            pubsub_tcpHandler_closeConnectionEntry(handle, entry, false);
+        } else {
+            L_ERROR("[PSA TCP] Did not found a connection with '%s'", url);
+        }
         celixThreadRwlock_unlock(&handle->dbLock);
     }
     return rc;
@@ -347,7 +373,7 @@ int pubsub_tcpHandler_disconnect(pubsub_tcpHandler_t *handle, char *url) {
 static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock) {
     int rc = 0;
     if (handle != NULL && entry != NULL) {
-        fprintf(stdout, "[TCP Socket] Close connection to url: %s: \n", entry->url);
+        L_INFO("[TCP Socket] Close connection to url: %s: ", entry->url);
         hashMap_remove(handle->fd_map, (void *) (intptr_t) entry->fd);
         if ((handle->efd >= 0)) {
             struct epoll_event event;
@@ -355,7 +381,6 @@ static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *ha
             rc = epoll_ctl(handle->efd, EPOLL_CTL_DEL, entry->fd, &event);
             if (rc < 0) {
                 L_ERROR("[PSA TCP] Error disconnecting %s\n", strerror(errno));
-                errno = 0;
             }
         }
         if (entry->fd >= 0) {
@@ -404,7 +429,6 @@ int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int fd) {
     rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
     if (rc < 0) {
       L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING epoll: %s\n", strerror(errno));
-      errno = 0;
     }
   }
   return rc;
@@ -421,7 +445,6 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url) {
         if (rc != 0) {
             L_ERROR("[TCP Socket] Error listen: %s\n", strerror(errno));
             pubsub_tcpHandler_freeEntry(&handle->own);
-            errno = 0;
         }
     }
     if (rc >= 0) {
@@ -439,7 +462,6 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url) {
         rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, fd, &event);
         if (rc < 0) {
             L_ERROR("[TCP Socket] Cannot create epoll: %s\n",strerror(errno));
-            errno = 0;
         }
     }
     celixThreadRwlock_unlock(&handle->dbLock);
@@ -457,7 +479,6 @@ int pubsub_tcpHandler_setInAddr(pubsub_tcpHandler_t *handle, const char *hostnam
             hp = gethostbyname(hostname);
             if (hp == NULL) {
                 L_ERROR("[TCP Socket] set_in_addr: Unknown host name %s, %s\n", hostname, strerror(errno));
-                errno = 0;
                 return -1;
             }
             inp->sin_addr = *(struct in_addr *) hp->h_addr;
@@ -557,6 +578,40 @@ void pubsub_tcpHandler_setBlockingRead(pubsub_tcpHandler_t *handle, bool blockin
     }
 }
 
+
+void pubsub_tcpHandler_setSendRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count) {
+    if (handle != NULL) {
+        celixThreadRwlock_writeLock(&handle->dbLock);
+        handle->maxSendRetryCount = count;
+        celixThreadRwlock_unlock(&handle->dbLock);
+    }
+}
+
+
+void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count) {
+    if (handle != NULL) {
+        celixThreadRwlock_writeLock(&handle->dbLock);
+        handle->maxRcvRetryCount = count;
+        celixThreadRwlock_unlock(&handle->dbLock);
+    }
+}
+
+void pubsub_tcpHandler_setSendTimeOut(pubsub_tcpHandler_t *handle, double timeout) {
+    if (handle != NULL) {
+        celixThreadRwlock_writeLock(&handle->dbLock);
+        handle->sendTimeout = timeout;
+        celixThreadRwlock_unlock(&handle->dbLock);
+    }
+}
+
+void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double timeout) {
+    if (handle != NULL) {
+        celixThreadRwlock_writeLock(&handle->dbLock);
+        handle->rcvTimeout = timeout;
+        celixThreadRwlock_unlock(&handle->dbLock);
+    }
+}
+
 //
 // Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure
 // If the message is completely reassembled true is returned and the index and size have valid values
@@ -591,7 +646,7 @@ int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigne
                 if (buffer) {
                     entry->buffer = buffer;
                     entry->bufferSize = entry->expectedReadSize;
-                    L_WARN("[TCP Socket: %d, url: %s,  realloc read buffer: (%d, %d) \n", entry->fd, entry->url,
+                    L_WARN("[TCP Socket] socket: %d, url: %s,  realloc read buffer: (%d, %d) \n", entry->fd, entry->url,
                            entry->bufferSize, entry->expectedReadSize);
                 }
             }
@@ -603,11 +658,19 @@ int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigne
     }
     // Read the message
     int nbytes = recv(fd, &entry->buffer[entry->bufferReadSize], entry->expectedReadSize, 0);
+    // Handle Socket error, when nbytes == 0 => Connection is lost
     if (nbytes < 0) {
-        // Handle Socket error, when nbytes == 0 => Connection is lost
-        if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {}
-        else L_ERROR("[TCP Socket] read error %s\n", strerror(errno));
-        errno = 0;
+        if(entry->retryCount < handle->maxRcvRetryCount) {
+            entry->retryCount++;
+            L_WARN("[TCP Socket] Failed to receive message (fd: %d), error: %s. Retry count %u of %u,",
+                   entry->fd, strerror(errno), entry->retryCount, handle->maxRcvRetryCount);
+        } else {
+            L_ERROR("[TCP Socket] Failed to receive message (fd: %d) after %u retries! Closing connection... Error: %s",
+                    entry->fd, handle->maxRcvRetryCount, strerror(errno));
+            nbytes = 0; //Return 0 as indicator to close the connection
+        }
+    } else {
+        entry->retryCount = 0;
     }
     if ((!handle->bypassHeader) && (nbytes > 0)) {
         // Update buffer administration
@@ -653,14 +716,11 @@ int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigne
                 // Set data read size
                 entry->expectedReadSize = pHeader->bufferSize;
                 entry->readState++;
-                // The data is read, update administation and set state to READ_STATE_READY
+                // The data is read, update administration and set state to READ_STATE_READY
             } else if (entry->readState == READ_STATE_DATA) {
                 handle->readSeqNr = pHeader->seqNr;
                 //fprintf(stdout, "ReadSeqNr: Count: %d\n", handle->readSeqNr);
                 nbytes = entry->bufferReadSize - sizeof(pubsub_tcp_msg_header_t);
-                if (nbytes == 0) {
-                    errno = 0;
-                }
                 // if buffer does not contain header, reset buffer
                 if (nbytes < 0) {
                     L_ERROR("[TCP Socket] incomplete message\n");
@@ -750,7 +810,8 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_tcp_msg_header_t
                             unsigned int size, int flags) {
     celixThreadRwlock_readLock(&handle->dbLock);
     int result = 0;
-    int written = 0;
+    int connFdCloseQueue[hashMap_size(handle->fd_map)];
+    int nofConnToClose = 0;
     header->marker_start = MARKER_START_PATTERN;
     header->marker_end   = MARKER_END_PATTERN;
     header->bufferSize   = size;
@@ -759,6 +820,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_tcp_msg_header_t
     while (hashMapIterator_hasNext(&iter)) {
         psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
 
+        size_t msgSize = 0;
         // struct iovec *largeMsg_iovec, int len, ,
         struct iovec msg_iovec[MAX_MSG_VECTOR_LEN];
         struct msghdr msg;
@@ -775,34 +837,48 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_tcp_msg_header_t
             msg.msg_iov[1].iov_base = buffer;
             msg.msg_iov[1].iov_len = size;
             msg.msg_iovlen = 2;
+            msgSize = msg.msg_iov[0].iov_len + msg.msg_iov[1].iov_len;
+
         } else {
             msg.msg_iov[0].iov_base = buffer;
             msg.msg_iov[0].iov_len = size;
             msg.msg_iovlen = 1;
+            msgSize = msg.msg_iov[0].iov_len;
         }
 
-        int nbytes = 0;
+        long int nbytes = 0;
         if (entry->fd >= 0) nbytes = sendmsg(entry->fd, &msg, MSG_NOSIGNAL);
-        //  Several errors are OK. When speculative write is being done we may not
-        //  be able to write a single byte to the socket buffer. (socket buffer full)
-        //  In this case when socket is not blocking, exit write function.
+        //  When a specific socket keeps reporting errors can indicate a subscriber
+        //  which is not active anymore, the connection will remain until the retry
+        //  counter exceeds the maximum retry count.
         //  Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
         if (nbytes == -1) {
-            result = ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) ? 0 : -1;
-            L_ERROR("[TCP Socket] Seq_Id: %d Cannot send msg %s\n", header->seqNr, strerror(errno));
-            errno = 0;
-        }
-        int msgSize = 0;
-        for (int i = 0; i < msg.msg_iovlen; i++) {
-            msgSize+=msg.msg_iov[i].iov_len;
-        }
-        if (nbytes != msgSize) {
-            L_ERROR("[TCP Socket] Seq; %d, MsgSize not correct: %d != %d (BufferSize: %d \n", header->seqNr, msgSize, nbytes, header->bufferSize);
+            if(entry->retryCount < handle->maxSendRetryCount) {
+                entry->retryCount++;
+                L_WARN("[TCP Socket] Failed to send message (fd: %d), error: %s. try again. Retry count %u of %u, "
+                       "buffer size: %u",
+                        entry->fd, strerror(errno), entry->retryCount, handle->maxSendRetryCount, header->bufferSize);
+            } else {
+                L_ERROR("[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s",
+                        entry->fd, handle->maxSendRetryCount, strerror(errno));
+                connFdCloseQueue[nofConnToClose++] = entry->fd;
+            }
+            result = -1; //At least one connection failed sending
+        } else {
+            entry->retryCount = 0;
+            if (nbytes != msgSize) {
+                L_ERROR("[TCP Socket] Seq; %d, MsgSize not correct: %d != %d (BufferSize: %d)\n",
+                        header->seqNr, msgSize, nbytes, header->bufferSize);
+            }
         }
-        written = (result == 0) ? written + nbytes : written;
     }
     celixThreadRwlock_unlock(&handle->dbLock);
-    return (result == 0 ? written : result);
+
+    //Force close all connections that are queued in a list, done outside of locking handle->dbLock to prevent deadlock
+    for (int i = 0; i < nofConnToClose; i++) {
+        pubsub_tcpHandler_closeConnection(handle, connFdCloseQueue[i]);
+    }
+    return result;
 }
 
 const char *pubsub_tcpHandler_url(pubsub_tcpHandler_t *handle) {
@@ -818,7 +894,6 @@ int pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
         if (nof_events < 0) {
             if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {}
             else L_ERROR("[TCP Socket] Cannot create epoll wait (%d) %s\n", nof_events, strerror(errno));
-            errno = 0;
         }
         for (int i = 0; i < nof_events; i++) {
             if ((handle->own.fd >= 0) && (events[i].data.fd == handle->own.fd)) {
@@ -830,7 +905,6 @@ int pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
                 rc = fd;
                 if (rc == -1) {
                   L_ERROR("[TCP Socket] accept failed: %s\n", strerror(errno));
-                  errno = 0;
                 }
                 // Make file descriptor NonBlocking
                 if ((!handle->useBlockingWrite) && (rc >= 0)) {
@@ -851,6 +925,7 @@ int pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
                     entry->addr = their_addr;
                     entry->len  = len;
                     entry->connected = false;
+                    entry->retryCount = 0;
                     event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLOUT;
                     event.data.fd = entry->fd;
                     // Register Read to epoll
@@ -910,7 +985,6 @@ int pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
                 rc = getsockopt(events[i].data.fd, SOL_SOCKET, SO_ERROR, &err, &len);
                 if (rc != 0) {
                     L_ERROR("[TCP Socket]:EPOLLOUT ERROR read from socket %s\n", strerror(errno));
-                    errno = 0;
                     continue;
                 }
                 celixThreadRwlock_readLock(&handle->dbLock);
@@ -936,14 +1010,13 @@ int pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
                 socklen_t len = sizeof(int);
                 rc = getsockopt(events[i].data.fd, SOL_SOCKET, SO_ERROR, &err, &len);
                 if (rc != 0) {
-                    L_ERROR("[TCP Socket]:EPOLLRDHUP ERROR read from socket %s\n",strerror(errno));
-                    errno = 0;
+                    L_ERROR("[TCP Socket]:EPOLLRDHUP ERROR read from socket %s\n", strerror(errno));
                     continue;
                 }
                 pubsub_tcpHandler_closeConnection(handle, events[i].data.fd);
             } else if (events[i].events & EPOLLERR) {
                 L_ERROR("[TCP Socket]:EPOLLERR  ERROR read from socket %s\n",strerror(errno));
-                errno = 0;
+                pubsub_tcpHandler_closeConnection(handle, events[i].data.fd);
                 continue;
             }
         }
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
index f1be060..cf64144 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
@@ -60,6 +60,10 @@ void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle, unsigned int time
 void pubsub_tcpHandler_setBypassHeader(pubsub_tcpHandler_t *handle, bool bypassHeader, unsigned int msgIdOffset, unsigned int msgIdSize);
 void pubsub_tcpHandler_setBlockingWrite(pubsub_tcpHandler_t *handle, bool blocking);
 void pubsub_tcpHandler_setBlockingRead(pubsub_tcpHandler_t *handle, bool blocking);
+void pubsub_tcpHandler_setSendRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count);
+void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count);
+void pubsub_tcpHandler_setSendTimeOut(pubsub_tcpHandler_t *handle, double timeout);
+void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double timeout);
 
 int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigned int *index, unsigned int *size);
 int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd, unsigned int index, pubsub_tcp_msg_header_t** header, void ** buffer, unsigned int size);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index 23af6c3..9b4af40 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -198,8 +198,12 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
         bool bypassHeader = celix_properties_getAsBool((celix_properties_t *) topicProperties, PUBSUB_TCP_BYPASS_HEADER, PUBSUB_TCP_DEFAULT_BYPASS_HEADER);
         long msgIdOffset  = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_MESSAGE_ID_OFFSET, PUBSUB_TCP_DEFAULT_MESSAGE_ID_OFFSET);
         long msgIdSize    = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_MESSAGE_ID_SIZE,   PUBSUB_TCP_DEFAULT_MESSAGE_ID_SIZE);
+        long retryCnt     = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_KEY, PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_DEFAULT);
+        double rcvTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT);
         pubsub_tcpHandler_setBypassHeader(receiver->socketHandler, bypassHeader, (unsigned int)msgIdOffset, (unsigned int)msgIdSize);
         pubsub_tcpHandler_setBlockingRead(receiver->socketHandler, blocking);
+        pubsub_tcpHandler_setReceiveRetryCnt(receiver->socketHandler, (unsigned int) retryCnt);
+        pubsub_tcpHandler_setReceiveTimeOut(receiver->socketHandler, rcvTimeout);
     }
 
     psa_tcp_setScopeAndTopicFilter(scope, topic, receiver->scopeAndTopicFilter);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index 6795af1..ba7fbb3 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -144,8 +144,12 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
         bool bypassHeader = celix_properties_getAsBool((celix_properties_t *) topicProperties, PUBSUB_TCP_BYPASS_HEADER, PUBSUB_TCP_DEFAULT_BYPASS_HEADER);
         long msgIdOffset  = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_MESSAGE_ID_OFFSET, PUBSUB_TCP_DEFAULT_MESSAGE_ID_OFFSET);
         long msgIdSize    = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_MESSAGE_ID_SIZE,   PUBSUB_TCP_DEFAULT_MESSAGE_ID_SIZE);
+        long retryCnt     = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY, PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
+        double timeout    = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY, PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT);
         pubsub_tcpHandler_setBypassHeader(sender->socketHandler, bypassHeader, (unsigned int)msgIdOffset, (unsigned int)msgIdSize);
         pubsub_tcpHandler_setBlockingWrite(sender->socketHandler, blocking);
+        pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);
+        pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, timeout);
     }
     /* Check if it's a static endpoint */
     bool isEndPointTypeClient = false;
@@ -537,7 +541,7 @@ static int psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, co
                 sendCountUpdate = 1;
             } else {
                 sendErrorUpdate = 1;
-                L_WARN("[PSA_TCP_TS] Error sending tcp. %s", strerror(errno));
+                L_WARN("[PSA_TCP_TS] Error sending tcp.");
             }
         } else {
             serializationErrorUpdate = 1;