You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2020/02/21 11:01:20 UTC
[celix] branch develop updated: Feature/tcp admin improvements
(#154)
This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/develop by this push:
new 99c7487 Feature/tcp admin improvements (#154)
99c7487 is described below
commit 99c7487b02d2097fd4efed2c16de8bbc669b6b33
Author: dhbfischer <52...@users.noreply.github.com>
AuthorDate: Fri Feb 21 12:01:13 2020 +0100
Feature/tcp admin improvements (#154)
* Improved send and receive handling for TCP admin
* Undo unnecessary print
* Reset recv failure retry counter
---
.../src/pubsub_psa_tcp_constants.h | 13 ++
.../pubsub_admin_tcp/src/pubsub_tcp_handler.c | 159 +++++++++++++++------
.../pubsub_admin_tcp/src/pubsub_tcp_handler.h | 4 +
.../src/pubsub_tcp_topic_receiver.c | 4 +
.../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c | 6 +-
5 files changed, 142 insertions(+), 44 deletions(-)
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
index d0c0ebd..7eb1171 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
@@ -53,9 +53,22 @@
#define PUBSUB_TCP_PUBLISHER_BLOCKING_KEY "PUBSUB_TCP_PUBLISHER_BLOCKING"
#define PUBSUB_TCP_PUBLISHER_BLOCKING_DEFAULT true
+#define PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY "PUBSUB_TCP_PUBLISHER_RETRY_COUNT"
+#define PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT 5
+
+#define PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_KEY "PUBSUB_TCP_SUBSCRIBER_RETRY_COUNT"
+#define PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_DEFAULT 5
+
#define PUBSUB_TCP_SUBSCRIBER_BLOCKING_KEY "PUBSUB_TCP_SUBSCRIBER_BLOCKING"
#define PUBSUB_TCP_SUBSCRIBER_BLOCKING_DEFAULT true
+//Time-out settings are only for BLOCKING connections
+#define PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY "PUBSUB_TCP_PUBLISHER_SEND_TIMEOUT"
+#define PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT 5.0
+
+#define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY "PUBSUB_TCP_SUBSCRIBER_RCV_TIMEOUT"
+#define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT 5.0
+
#define PUBSUB_TCP_PSA_IP_KEY "PSA_IP"
#define PUBSUB_TCP_PSA_ITF_KEY "PSA_INTERFACE"
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index cfe3e31..1cbcadf 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -78,6 +78,7 @@ typedef struct psa_tcp_connection_entry {
int expectedReadSize;
int readState;
pubsub_tcp_msg_header_t header;
+ unsigned int retryCount;
} psa_tcp_connection_entry_t;
@@ -101,6 +102,10 @@ struct pubsub_tcpHandler {
log_helper_t *logHelper;
unsigned int bufferSize;
unsigned int maxNofBuffer;
+ unsigned int maxSendRetryCount;
+ unsigned int maxRcvRetryCount;
+ double sendTimeout;
+ double rcvTimeout;
psa_tcp_connection_entry_t own;
};
@@ -130,6 +135,8 @@ pubsub_tcpHandler_t *pubsub_tcpHandler_create(log_helper_t *logHelper) {
handle->bufferSize = MAX_DEFAULT_BUFFER_SIZE;
handle->maxNofBuffer = 1; // Reserved for future Use;
handle->useBlockingWrite = true;
+ handle->sendTimeout = 0.0;
+ handle->rcvTimeout = 0.0;
pubsub_tcpHandler_setupEntry(&handle->own, -1, NULL, MAX_DEFAULT_BUFFER_SIZE);
celixThreadRwlock_create(&handle->dbLock, 0);
//signal(SIGPIPE, SIG_IGN);
@@ -142,7 +149,7 @@ pubsub_tcpHandler_t *pubsub_tcpHandler_create(log_helper_t *logHelper) {
// Destroys the handle
//
void pubsub_tcpHandler_destroy(pubsub_tcpHandler_t *handle) {
- printf("### Destroying BufferHandler TCP\n");
+ L_INFO("### Destroying BufferHandler TCP");
if (handle != NULL) {
celixThreadRwlock_writeLock(&handle->dbLock);
pubsub_tcpHandler_close(handle);
@@ -188,6 +195,24 @@ int pubsub_tcpHandler_open(pubsub_tcpHandler_t *handle, char *url) {
} else {
L_ERROR("[TCP Socket] Error creating socket: %s\n", strerror(errno));
}
+ if (rc == 0 && handle->sendTimeout != 0.0) {
+ struct timeval tv;
+ tv.tv_sec = (long int) handle->sendTimeout;
+ tv.tv_usec = (long int) ((handle->sendTimeout - tv.tv_sec) * 1000000.0);
+ rc = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
+ if(rc != 0) {
+ L_ERROR("[TCP Socket] Error setsockopt (SO_SNDTIMEO) to set send timeout: %s", strerror(errno));
+ }
+ }
+ if (rc == 0 && handle->rcvTimeout != 0.0) {
+ struct timeval tv;
+ tv.tv_sec = (long int) handle->rcvTimeout;
+ tv.tv_usec = (long int) ((handle->rcvTimeout - tv.tv_sec) * 1000000.0);
+ rc = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
+ if(rc != 0) {
+ L_ERROR("[TCP Socket] Error setsockopt (SO_RCVTIMEO) to set send timeout: %s", strerror(errno));
+ }
+ }
struct sockaddr_in addr; // connector's address information
pubsub_tcpHandler_url_t url_info;
pubsub_tcpHandler_setUrlInfo(url, &url_info);
@@ -284,7 +309,6 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
L_ERROR("[TCP Socket] Cannot connect to %s:%d: err: %s\n", url_info.hostname, url_info.portnr,
strerror(errno));
close(fd);
- errno = 0;
} else {
struct sockaddr_in sin;
socklen_t len = sizeof(sin);
@@ -294,7 +318,6 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
rc = getsockname(fd, (struct sockaddr *) &sin, &len);
if (rc < 0) {
L_ERROR("[TCP Socket] getsockname %s\n", strerror(errno));
- errno = 0;
} else if (handle->own.url == NULL) {
char *address = inet_ntoa(sin.sin_addr);
unsigned int port = ntohs(sin.sin_port);
@@ -313,7 +336,6 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
pubsub_tcpHandler_freeEntry(entry);
free(entry);
L_ERROR("[TCP Socket] Cannot create epoll %s\n", strerror(errno));
- errno = 0;
entry = NULL;
}
}
@@ -335,7 +357,11 @@ int pubsub_tcpHandler_disconnect(pubsub_tcpHandler_t *handle, char *url) {
if (handle != NULL) {
celixThreadRwlock_writeLock(&handle->dbLock);
psa_tcp_connection_entry_t *entry = hashMap_remove(handle->url_map, url);
- pubsub_tcpHandler_closeConnectionEntry(handle, entry, false);
+ if (entry != NULL) {
+ pubsub_tcpHandler_closeConnectionEntry(handle, entry, false);
+ } else {
+ L_ERROR("[PSA TCP] Did not found a connection with '%s'", url);
+ }
celixThreadRwlock_unlock(&handle->dbLock);
}
return rc;
@@ -347,7 +373,7 @@ int pubsub_tcpHandler_disconnect(pubsub_tcpHandler_t *handle, char *url) {
static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock) {
int rc = 0;
if (handle != NULL && entry != NULL) {
- fprintf(stdout, "[TCP Socket] Close connection to url: %s: \n", entry->url);
+ L_INFO("[TCP Socket] Close connection to url: %s: ", entry->url);
hashMap_remove(handle->fd_map, (void *) (intptr_t) entry->fd);
if ((handle->efd >= 0)) {
struct epoll_event event;
@@ -355,7 +381,6 @@ static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *ha
rc = epoll_ctl(handle->efd, EPOLL_CTL_DEL, entry->fd, &event);
if (rc < 0) {
L_ERROR("[PSA TCP] Error disconnecting %s\n", strerror(errno));
- errno = 0;
}
}
if (entry->fd >= 0) {
@@ -404,7 +429,6 @@ int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int fd) {
rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
if (rc < 0) {
L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING epoll: %s\n", strerror(errno));
- errno = 0;
}
}
return rc;
@@ -421,7 +445,6 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url) {
if (rc != 0) {
L_ERROR("[TCP Socket] Error listen: %s\n", strerror(errno));
pubsub_tcpHandler_freeEntry(&handle->own);
- errno = 0;
}
}
if (rc >= 0) {
@@ -439,7 +462,6 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url) {
rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, fd, &event);
if (rc < 0) {
L_ERROR("[TCP Socket] Cannot create epoll: %s\n",strerror(errno));
- errno = 0;
}
}
celixThreadRwlock_unlock(&handle->dbLock);
@@ -457,7 +479,6 @@ int pubsub_tcpHandler_setInAddr(pubsub_tcpHandler_t *handle, const char *hostnam
hp = gethostbyname(hostname);
if (hp == NULL) {
L_ERROR("[TCP Socket] set_in_addr: Unknown host name %s, %s\n", hostname, strerror(errno));
- errno = 0;
return -1;
}
inp->sin_addr = *(struct in_addr *) hp->h_addr;
@@ -557,6 +578,40 @@ void pubsub_tcpHandler_setBlockingRead(pubsub_tcpHandler_t *handle, bool blockin
}
}
+
+void pubsub_tcpHandler_setSendRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count) {
+ if (handle != NULL) {
+ celixThreadRwlock_writeLock(&handle->dbLock);
+ handle->maxSendRetryCount = count;
+ celixThreadRwlock_unlock(&handle->dbLock);
+ }
+}
+
+
+void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count) {
+ if (handle != NULL) {
+ celixThreadRwlock_writeLock(&handle->dbLock);
+ handle->maxRcvRetryCount = count;
+ celixThreadRwlock_unlock(&handle->dbLock);
+ }
+}
+
+void pubsub_tcpHandler_setSendTimeOut(pubsub_tcpHandler_t *handle, double timeout) {
+ if (handle != NULL) {
+ celixThreadRwlock_writeLock(&handle->dbLock);
+ handle->sendTimeout = timeout;
+ celixThreadRwlock_unlock(&handle->dbLock);
+ }
+}
+
+void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double timeout) {
+ if (handle != NULL) {
+ celixThreadRwlock_writeLock(&handle->dbLock);
+ handle->rcvTimeout = timeout;
+ celixThreadRwlock_unlock(&handle->dbLock);
+ }
+}
+
//
// Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure
// If the message is completely reassembled true is returned and the index and size have valid values
@@ -591,7 +646,7 @@ int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigne
if (buffer) {
entry->buffer = buffer;
entry->bufferSize = entry->expectedReadSize;
- L_WARN("[TCP Socket: %d, url: %s, realloc read buffer: (%d, %d) \n", entry->fd, entry->url,
+ L_WARN("[TCP Socket] socket: %d, url: %s, realloc read buffer: (%d, %d) \n", entry->fd, entry->url,
entry->bufferSize, entry->expectedReadSize);
}
}
@@ -603,11 +658,19 @@ int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigne
}
// Read the message
int nbytes = recv(fd, &entry->buffer[entry->bufferReadSize], entry->expectedReadSize, 0);
+ // Handle Socket error, when nbytes == 0 => Connection is lost
if (nbytes < 0) {
- // Handle Socket error, when nbytes == 0 => Connection is lost
- if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {}
- else L_ERROR("[TCP Socket] read error %s\n", strerror(errno));
- errno = 0;
+ if(entry->retryCount < handle->maxRcvRetryCount) {
+ entry->retryCount++;
+ L_WARN("[TCP Socket] Failed to receive message (fd: %d), error: %s. Retry count %u of %u,",
+ entry->fd, strerror(errno), entry->retryCount, handle->maxRcvRetryCount);
+ } else {
+ L_ERROR("[TCP Socket] Failed to receive message (fd: %d) after %u retries! Closing connection... Error: %s",
+ entry->fd, handle->maxRcvRetryCount, strerror(errno));
+ nbytes = 0; //Return 0 as indicator to close the connection
+ }
+ } else {
+ entry->retryCount = 0;
}
if ((!handle->bypassHeader) && (nbytes > 0)) {
// Update buffer administration
@@ -653,14 +716,11 @@ int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigne
// Set data read size
entry->expectedReadSize = pHeader->bufferSize;
entry->readState++;
- // The data is read, update administation and set state to READ_STATE_READY
+ // The data is read, update administration and set state to READ_STATE_READY
} else if (entry->readState == READ_STATE_DATA) {
handle->readSeqNr = pHeader->seqNr;
//fprintf(stdout, "ReadSeqNr: Count: %d\n", handle->readSeqNr);
nbytes = entry->bufferReadSize - sizeof(pubsub_tcp_msg_header_t);
- if (nbytes == 0) {
- errno = 0;
- }
// if buffer does not contain header, reset buffer
if (nbytes < 0) {
L_ERROR("[TCP Socket] incomplete message\n");
@@ -750,7 +810,8 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_tcp_msg_header_t
unsigned int size, int flags) {
celixThreadRwlock_readLock(&handle->dbLock);
int result = 0;
- int written = 0;
+ int connFdCloseQueue[hashMap_size(handle->fd_map)];
+ int nofConnToClose = 0;
header->marker_start = MARKER_START_PATTERN;
header->marker_end = MARKER_END_PATTERN;
header->bufferSize = size;
@@ -759,6 +820,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_tcp_msg_header_t
while (hashMapIterator_hasNext(&iter)) {
psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+ size_t msgSize = 0;
// struct iovec *largeMsg_iovec, int len, ,
struct iovec msg_iovec[MAX_MSG_VECTOR_LEN];
struct msghdr msg;
@@ -775,34 +837,48 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_tcp_msg_header_t
msg.msg_iov[1].iov_base = buffer;
msg.msg_iov[1].iov_len = size;
msg.msg_iovlen = 2;
+ msgSize = msg.msg_iov[0].iov_len + msg.msg_iov[1].iov_len;
+
} else {
msg.msg_iov[0].iov_base = buffer;
msg.msg_iov[0].iov_len = size;
msg.msg_iovlen = 1;
+ msgSize = msg.msg_iov[0].iov_len;
}
- int nbytes = 0;
+ long int nbytes = 0;
if (entry->fd >= 0) nbytes = sendmsg(entry->fd, &msg, MSG_NOSIGNAL);
- // Several errors are OK. When speculative write is being done we may not
- // be able to write a single byte to the socket buffer. (socket buffer full)
- // In this case when socket is not blocking, exit write function.
+ // When a specific socket keeps reporting errors can indicate a subscriber
+ // which is not active anymore, the connection will remain until the retry
+ // counter exceeds the maximum retry count.
// Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
if (nbytes == -1) {
- result = ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) ? 0 : -1;
- L_ERROR("[TCP Socket] Seq_Id: %d Cannot send msg %s\n", header->seqNr, strerror(errno));
- errno = 0;
- }
- int msgSize = 0;
- for (int i = 0; i < msg.msg_iovlen; i++) {
- msgSize+=msg.msg_iov[i].iov_len;
- }
- if (nbytes != msgSize) {
- L_ERROR("[TCP Socket] Seq; %d, MsgSize not correct: %d != %d (BufferSize: %d \n", header->seqNr, msgSize, nbytes, header->bufferSize);
+ if(entry->retryCount < handle->maxSendRetryCount) {
+ entry->retryCount++;
+ L_WARN("[TCP Socket] Failed to send message (fd: %d), error: %s. try again. Retry count %u of %u, "
+ "buffer size: %u",
+ entry->fd, strerror(errno), entry->retryCount, handle->maxSendRetryCount, header->bufferSize);
+ } else {
+ L_ERROR("[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s",
+ entry->fd, handle->maxSendRetryCount, strerror(errno));
+ connFdCloseQueue[nofConnToClose++] = entry->fd;
+ }
+ result = -1; //At least one connection failed sending
+ } else {
+ entry->retryCount = 0;
+ if (nbytes != msgSize) {
+ L_ERROR("[TCP Socket] Seq; %d, MsgSize not correct: %d != %d (BufferSize: %d)\n",
+ header->seqNr, msgSize, nbytes, header->bufferSize);
+ }
}
- written = (result == 0) ? written + nbytes : written;
}
celixThreadRwlock_unlock(&handle->dbLock);
- return (result == 0 ? written : result);
+
+ //Force close all connections that are queued in a list, done outside of locking handle->dbLock to prevent deadlock
+ for (int i = 0; i < nofConnToClose; i++) {
+ pubsub_tcpHandler_closeConnection(handle, connFdCloseQueue[i]);
+ }
+ return result;
}
const char *pubsub_tcpHandler_url(pubsub_tcpHandler_t *handle) {
@@ -818,7 +894,6 @@ int pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
if (nof_events < 0) {
if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {}
else L_ERROR("[TCP Socket] Cannot create epoll wait (%d) %s\n", nof_events, strerror(errno));
- errno = 0;
}
for (int i = 0; i < nof_events; i++) {
if ((handle->own.fd >= 0) && (events[i].data.fd == handle->own.fd)) {
@@ -830,7 +905,6 @@ int pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
rc = fd;
if (rc == -1) {
L_ERROR("[TCP Socket] accept failed: %s\n", strerror(errno));
- errno = 0;
}
// Make file descriptor NonBlocking
if ((!handle->useBlockingWrite) && (rc >= 0)) {
@@ -851,6 +925,7 @@ int pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
entry->addr = their_addr;
entry->len = len;
entry->connected = false;
+ entry->retryCount = 0;
event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLOUT;
event.data.fd = entry->fd;
// Register Read to epoll
@@ -910,7 +985,6 @@ int pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
rc = getsockopt(events[i].data.fd, SOL_SOCKET, SO_ERROR, &err, &len);
if (rc != 0) {
L_ERROR("[TCP Socket]:EPOLLOUT ERROR read from socket %s\n", strerror(errno));
- errno = 0;
continue;
}
celixThreadRwlock_readLock(&handle->dbLock);
@@ -936,14 +1010,13 @@ int pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
socklen_t len = sizeof(int);
rc = getsockopt(events[i].data.fd, SOL_SOCKET, SO_ERROR, &err, &len);
if (rc != 0) {
- L_ERROR("[TCP Socket]:EPOLLRDHUP ERROR read from socket %s\n",strerror(errno));
- errno = 0;
+ L_ERROR("[TCP Socket]:EPOLLRDHUP ERROR read from socket %s\n", strerror(errno));
continue;
}
pubsub_tcpHandler_closeConnection(handle, events[i].data.fd);
} else if (events[i].events & EPOLLERR) {
L_ERROR("[TCP Socket]:EPOLLERR ERROR read from socket %s\n",strerror(errno));
- errno = 0;
+ pubsub_tcpHandler_closeConnection(handle, events[i].data.fd);
continue;
}
}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
index f1be060..cf64144 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
@@ -60,6 +60,10 @@ void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle, unsigned int time
void pubsub_tcpHandler_setBypassHeader(pubsub_tcpHandler_t *handle, bool bypassHeader, unsigned int msgIdOffset, unsigned int msgIdSize);
void pubsub_tcpHandler_setBlockingWrite(pubsub_tcpHandler_t *handle, bool blocking);
void pubsub_tcpHandler_setBlockingRead(pubsub_tcpHandler_t *handle, bool blocking);
+void pubsub_tcpHandler_setSendRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count);
+void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count);
+void pubsub_tcpHandler_setSendTimeOut(pubsub_tcpHandler_t *handle, double timeout);
+void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double timeout);
int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigned int *index, unsigned int *size);
int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd, unsigned int index, pubsub_tcp_msg_header_t** header, void ** buffer, unsigned int size);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index 23af6c3..9b4af40 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -198,8 +198,12 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
bool bypassHeader = celix_properties_getAsBool((celix_properties_t *) topicProperties, PUBSUB_TCP_BYPASS_HEADER, PUBSUB_TCP_DEFAULT_BYPASS_HEADER);
long msgIdOffset = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_MESSAGE_ID_OFFSET, PUBSUB_TCP_DEFAULT_MESSAGE_ID_OFFSET);
long msgIdSize = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_MESSAGE_ID_SIZE, PUBSUB_TCP_DEFAULT_MESSAGE_ID_SIZE);
+ long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_KEY, PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_DEFAULT);
+ double rcvTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT);
pubsub_tcpHandler_setBypassHeader(receiver->socketHandler, bypassHeader, (unsigned int)msgIdOffset, (unsigned int)msgIdSize);
pubsub_tcpHandler_setBlockingRead(receiver->socketHandler, blocking);
+ pubsub_tcpHandler_setReceiveRetryCnt(receiver->socketHandler, (unsigned int) retryCnt);
+ pubsub_tcpHandler_setReceiveTimeOut(receiver->socketHandler, rcvTimeout);
}
psa_tcp_setScopeAndTopicFilter(scope, topic, receiver->scopeAndTopicFilter);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index 6795af1..ba7fbb3 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -144,8 +144,12 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
bool bypassHeader = celix_properties_getAsBool((celix_properties_t *) topicProperties, PUBSUB_TCP_BYPASS_HEADER, PUBSUB_TCP_DEFAULT_BYPASS_HEADER);
long msgIdOffset = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_MESSAGE_ID_OFFSET, PUBSUB_TCP_DEFAULT_MESSAGE_ID_OFFSET);
long msgIdSize = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_MESSAGE_ID_SIZE, PUBSUB_TCP_DEFAULT_MESSAGE_ID_SIZE);
+ long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY, PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
+ double timeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY, PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT);
pubsub_tcpHandler_setBypassHeader(sender->socketHandler, bypassHeader, (unsigned int)msgIdOffset, (unsigned int)msgIdSize);
pubsub_tcpHandler_setBlockingWrite(sender->socketHandler, blocking);
+ pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);
+ pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, timeout);
}
/* Check if it's a static endpoint */
bool isEndPointTypeClient = false;
@@ -537,7 +541,7 @@ static int psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, co
sendCountUpdate = 1;
} else {
sendErrorUpdate = 1;
- L_WARN("[PSA_TCP_TS] Error sending tcp. %s", strerror(errno));
+ L_WARN("[PSA_TCP_TS] Error sending tcp.");
}
} else {
serializationErrorUpdate = 1;