You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rb...@apache.org on 2020/06/05 13:03:07 UTC
[celix] 01/01: refactor read and write function
This is an automated email from the ASF dual-hosted git repository.
rbulter pushed a commit to branch feature/make_tcp_admin_msg_sending_robust_when_tcp_send_timeout_expires
in repository https://gitbox.apache.org/repos/asf/celix.git
commit 9cdf93bf575aa14323ee4ec45fae9b1d02c18492
Author: Roy Bulter <ro...@gmail.com>
AuthorDate: Fri Jun 5 15:02:05 2020 +0200
refactor read and write function
---
.../src/pubsub_psa_tcp_constants.h | 2 +
.../pubsub_admin_tcp/src/pubsub_tcp_handler.c | 368 +++++++++++----------
.../pubsub_admin_tcp/src/pubsub_tcp_handler.h | 3 +-
.../src/pubsub_tcp_topic_receiver.c | 9 +-
.../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c | 5 +-
5 files changed, 208 insertions(+), 179 deletions(-)
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
index 3e7a7b3..6026212 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
@@ -61,9 +61,11 @@
//Time-out settings are only for BLOCKING connections
#define PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY "PUBSUB_TCP_PUBLISHER_SEND_TIMEOUT"
#define PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT 5.0
+#define PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT 0.0
#define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY "PUBSUB_TCP_SUBSCRIBER_RCV_TIMEOUT"
#define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT 5.0
+#define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_ENDPOINT_DEFAULT 0.0
#define PUBSUB_TCP_PSA_IP_KEY "PSA_IP"
#define PUBSUB_TCP_ADMIN_TYPE "tcp"
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 93f0358..7b13b6f 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -76,25 +76,28 @@ typedef struct psa_tcp_connection_entry {
struct sockaddr_in addr;
socklen_t len;
bool connected;
+ bool headerError;
pubsub_protocol_message_t header;
unsigned int syncSize;
unsigned int headerSize;
unsigned int headerBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload
void *headerBuffer;
+ void *footerBuffer;
unsigned int bufferSize;
void *buffer;
+ unsigned int bufferReadSize;
unsigned int metaBufferSize;
void *metaBuffer;
struct msghdr msg;
size_t msg_iovlen; /* Number of elements in the vector. */
unsigned int retryCount;
+ unsigned int seqNr;
} psa_tcp_connection_entry_t;
//
// Handle administration
//
struct pubsub_tcpHandler {
- unsigned int readSeqNr;
celix_thread_rwlock_t dbLock;
unsigned int timeout;
hash_map_t *connection_url_map;
@@ -137,9 +140,9 @@ static inline void pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry
static inline void pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *handle, int fd, unsigned int index);
-static inline void pubsub_tcpHandler_readHandler(pubsub_tcpHandler_t *handle, int fd);
+static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* buffer, unsigned int offset, unsigned int size, int flag );
-static inline void pubsub_tcpHandler_readHandler(pubsub_tcpHandler_t *handle, int fd);
+static inline void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry);
static inline void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd);
@@ -337,6 +340,7 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->headerSize;
entry->msg_iovlen++;
}
+ entry->footerBuffer = calloc(sizeof(char), entry->headerSize);
entry->buffer = calloc(sizeof(char), entry->bufferSize);
entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->buffer;
entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->bufferSize;
@@ -378,6 +382,12 @@ pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry) {
entry->headerBuffer = NULL;
entry->headerBufferSize = 0;
}
+
+ if (entry->footerBuffer) {
+ free(entry->footerBuffer);
+ entry->footerBuffer = NULL;
+ }
+
if (entry->metaBuffer) {
free(entry->metaBuffer);
entry->metaBuffer = NULL;
@@ -438,7 +448,7 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
#else
struct epoll_event event;
bzero(&event, sizeof(struct epoll_event)); // zero the struct
- event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLOUT;
+ event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR;
event.data.fd = entry->fd;
rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event);
#endif
@@ -520,7 +530,7 @@ pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle,
psa_tcp_connection_entry_t *entry) {
int rc = 0;
if (handle != NULL && entry != NULL) {
- fprintf(stdout, "[TCP Socket] Close interface url: %s: \n", entry->url);
+ L_INFO("[TCP Socket] Close interface url: %s: \n", entry->url);
hashMap_remove(handle->interface_fd_map, (void *) (intptr_t) entry->fd);
if ((handle->efd >= 0)) {
#if defined(__APPLE__)
@@ -546,8 +556,7 @@ pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle,
//
// Make accept file descriptor non blocking
//
-static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle,
- int fd) {
+static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int fd) {
int rc = 0;
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1)
@@ -714,13 +723,9 @@ void pubsub_tcpHandler_setThreadPriority(pubsub_tcpHandler_t *handle, long prio,
sch.sched_priority = prio;
pthread_setschedparam(handle->thread.thread, policy, &sch);
} else {
- printf("Skipping configuration of thread prio to %i and thread "
+ L_INFO("Skipping configuration of thread prio to %i and thread "
"scheduling to %s. No permission\n",
(int) prio, sched);
- celix_logHelper_log(handle->logHelper, CELIX_LOG_LEVEL_INFO,
- "Skipping configuration of thread prio to %i and thread "
- "scheduling to %s. No permission\n",
- (int) prio, sched);
}
celixThreadRwlock_unlock(&handle->dbLock);
}
@@ -759,14 +764,51 @@ void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double tim
}
}
+static inline
+int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* _buffer, unsigned int offset, unsigned int size, int flag ) {
+ int expectedReadSize = size;
+ int nbytes = size;
+ int msgSize = 0;
+ char* buffer = (char*)_buffer;
+ while (nbytes > 0 && expectedReadSize > 0) {
+ // Read the message header
+ nbytes = recv(fd, &buffer[offset], expectedReadSize, flag | MSG_NOSIGNAL);
+ // Update buffer administration
+ offset += nbytes;
+ expectedReadSize -= nbytes;
+ msgSize += nbytes;
+ }
+ if (nbytes <=0) msgSize = nbytes;
+ return msgSize;
+}
+
+
+static inline
+void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry) {
+
+ if (entry->header.header.payloadSize > 0) {
+ handle->protocol->decodePayload(handle->protocol->handle, entry->buffer, entry->header.header.payloadSize, &entry->header);
+ }
+ if (entry->header.header.metadataSize > 0) {
+ handle->protocol->decodeMetadata(handle->protocol->handle, entry->metaBuffer,
+ entry->header.header.metadataSize, &entry->header);
+ }
+ if (handle->processMessageCallback && entry->header.payload.payload != NULL && entry->header.payload.length) {
+ struct timespec receiveTime;
+ clock_gettime(CLOCK_REALTIME, &receiveTime);
+ bool releaseEntryBuffer = false;
+ handle->processMessageCallback(handle->processMessagePayload, &entry->header, &releaseEntryBuffer, &receiveTime);
+ if (releaseEntryBuffer) pubsub_tcpHandler_releaseEntryBuffer(handle, entry->fd, 0);
+ }
+}
+
+
//
// Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure
// If the message is completely reassembled true is returned and the index and size have valid values
//
-int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigned int *index, bool *readMsg) {
+int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
celixThreadRwlock_writeLock(&handle->dbLock);
- *index = 0;
- *readMsg = false;
psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) fd);
if (entry == NULL)
entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd);
@@ -784,93 +826,105 @@ int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigne
// Message buffer is to small, reallocate to make it bigger
if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize)) {
handle->bufferSize = MAX(handle->bufferSize, entry->headerSize);
- char *buffer = realloc(entry->buffer, (size_t) handle->bufferSize);
- if (buffer) {
- entry->buffer = buffer;
+ if (entry->buffer) free(entry->buffer);
+ entry->buffer = malloc((size_t) handle->bufferSize);
entry->bufferSize = handle->bufferSize;
}
- }
-
// Read the message
- entry->msg.msg_iovlen = 0;
- entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = (entry->headerBufferSize) ? entry->headerBuffer
- : entry->buffer;
- entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->headerSize;
- entry->msg.msg_iovlen++;
- int nbytes = recvmsg(fd, &entry->msg, MSG_PEEK | MSG_NOSIGNAL);
+ bool validMsg = false;
+ char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer;
+ int nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, MSG_PEEK);
if (nbytes > 0) {
- entry->msg.msg_iovlen = 0;
- if (entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len != nbytes) {
- celixThreadRwlock_unlock(&handle->dbLock);
- return nbytes;
-
- } else if (handle->protocol->decodeHeader(handle->protocol->handle,
- entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base,
- entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len, &entry->header) !=
- CELIX_SUCCESS) {
- entry->msg.msg_iov[0].iov_len = entry->syncSize;
- nbytes = recvmsg(fd, &entry->msg, 0);
- if (nbytes > 0)
- entry->retryCount = 0;
- celixThreadRwlock_unlock(&handle->dbLock);
- return nbytes;
- }
- if (entry->header.header.payloadSize > entry->bufferSize) {
- handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize);
- char *buffer = realloc(entry->buffer, (size_t) handle->bufferSize);
- if (buffer) {
- entry->buffer = buffer;
- entry->bufferSize = handle->bufferSize;
- }
- }
- if (entry->header.header.metadataSize > entry->metaBufferSize) {
- char *buffer = realloc(entry->metaBuffer, (size_t) entry->header.header.metadataSize);
- if (buffer) {
- entry->metaBuffer = buffer;
- entry->metaBufferSize = entry->header.header.metadataSize;
- L_WARN("[TCP Socket] socket: %d, url: %s, realloc read meta buffer: (%d, %d) \n", entry->fd,
- entry->url, entry->metaBufferSize, entry->header.header.metadataSize);
- }
- }
-
- if (entry->headerBufferSize)
- entry->msg.msg_iovlen++;
- if (entry->header.header.payloadSize) {
- entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->buffer;
- entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->header.header.payloadSize;
- entry->msg.msg_iovlen++;
- }
- if (entry->header.header.metadataSize) {
- entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->metaBuffer;
- entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->header.header.metadataSize;
- entry->msg.msg_iovlen++;
- }
- nbytes = recvmsg(fd, &entry->msg, MSG_WAITALL | MSG_NOSIGNAL);
- } else {
- if (entry->retryCount < handle->maxRcvRetryCount) {
- entry->retryCount++;
- L_WARN("[TCP Socket] Failed to receive message header (fd: %d), error: %s. Retry count %u of %u,",
- entry->fd, strerror(errno), entry->retryCount, handle->maxRcvRetryCount);
+ // Check header message buffer
+ if (handle->protocol->decodeHeader(handle->protocol->handle,
+ header_buffer,
+ entry->headerSize,
+ &entry->header) != CELIX_SUCCESS) {
+ // Did not receive correct header
+ // skip sync word and try to read next header
+ nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize, 0);
+ if (!entry->headerError) L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url);
+ entry->headerError = true;
+ entry->bufferReadSize = 0;
} else {
- L_ERROR(
- "[TCP Socket] Failed to receive message header (fd: %d) after %u retries! Closing connection... Error: %s",
- entry->fd,
- handle->maxRcvRetryCount,
- strerror(errno));
- nbytes = 0; //Return 0 as indicator to close the connection
+ // Read header message from queue
+ nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, 0);
+ if ((nbytes > 0) && (nbytes == entry->headerSize)) {
+ entry->headerError = false;
+ // For headerless message, add header to bufferReadSize;
+ if (!entry->headerBufferSize)
+ entry->bufferReadSize += nbytes;
+ // Alloc message buffers
+ if (entry->header.header.payloadSize > entry->bufferSize) {
+ handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize);
+ if (entry->buffer)
+ free(entry->buffer);
+ entry->buffer = malloc((size_t) handle->bufferSize);
+ entry->bufferSize = handle->bufferSize;
+ }
+ if (entry->header.header.metadataSize > entry->metaBufferSize) {
+ if (entry->metaBuffer) {
+ free(entry->metaBuffer);
+ entry->metaBuffer = malloc((size_t) entry->header.header.metadataSize);
+ entry->bufferSize = handle->bufferSize;
+ L_WARN("[TCP Socket] socket: %d, url: %s, realloc read meta buffer: (%d, %d) \n", entry->fd,
+ entry->url, entry->metaBufferSize, entry->header.header.metadataSize);
+ }
+ }
+
+ if (entry->header.header.payloadSize) {
+ unsigned int offset = entry->header.header.payloadOffset;
+ unsigned int size = entry->header.header.payloadPartSize;
+ // For header less messages adjust offset and msg size;
+ if (!entry->headerBufferSize) {
+ offset = entry->headerSize;
+ size -= offset;
+ }
+ // Read payload data from queue
+ nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, offset, size, 0);
+ if (nbytes > 0) {
+ if (nbytes == size) {
+ entry->bufferReadSize += nbytes;
+ } else {
+ entry->bufferReadSize = 0;
+ L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
+ }
+ }
+ }
+ if (nbytes > 0 && entry->header.header.metadataSize) {
+ // Read meta data from queue
+ unsigned int size = entry->header.header.metadataSize;
+ nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0, size,0);
+ if ((nbytes > 0) && (nbytes != size)) {
+ L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
+ }
+ }
+ // Check for end of message using, header of next message. Because of streaming protocol
+ // TODO: Add to protocol service to decode/EncodeFooter with unique sync word(different then header)
+ if (nbytes > 0) {
+ pubsub_protocol_message_t header;
+ nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer, 0, entry->headerSize, MSG_PEEK);
+ if (handle->protocol->decodeHeader(handle->protocol->handle,
+ entry->footerBuffer,
+ entry->headerSize,
+ &header) == CELIX_SUCCESS) {
+ // valid header for next buffer, this means that the message is valid
+ validMsg = true;
+ } else {
+ // Did not receive correct header
+ L_ERROR("[TCP Socket] Failed to decode next message header seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url);
+ entry->bufferReadSize = 0;
+ }
+ }
+ }
}
}
if (nbytes > 0) {
entry->retryCount = 0;
- unsigned int msgSize = 0;
- for (int i = 0; i < entry->msg.msg_iovlen; i++) {
- msgSize += entry->msg.msg_iov[i].iov_len;
- }
- if (nbytes == msgSize) {
- *readMsg = true;
- } else {
- L_ERROR("[TCP Socket] Failed to receive complete message (fd: %d) nbytes : %d = msgSize %d", entry->fd,
- nbytes, msgSize);
+ // Check if complete message is received
+ if ((entry->bufferReadSize >= entry->header.header.payloadSize) && validMsg) {
+ entry->bufferReadSize = 0;
+ pubsub_tcpHandler_decodePayload(handle, entry);
}
} else {
if (entry->retryCount < handle->maxRcvRetryCount) {
@@ -887,34 +941,6 @@ int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigne
return nbytes;
}
-//
-// Read out the message which is indicated available by the largeUdp_dataAvailable function
-//
-int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd, unsigned int index __attribute__ ((__unused__)),
- pubsub_protocol_message_t **header) {
- int result = 0;
- celixThreadRwlock_readLock(&handle->dbLock);
- psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) fd);;
- if (entry == NULL)
- entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd);
- if (entry == NULL)
- result = -1;
- if (entry)
- result = (!entry->connected) ? -1 : result;
- if (!result) {
- if (entry->header.header.payloadSize > 0) {
- handle->protocol->decodePayload(handle->protocol->handle, entry->buffer, entry->header.header.payloadSize,
- &entry->header);
- }
- if (entry->header.header.metadataSize > 0) {
- handle->protocol->decodeMetadata(handle->protocol->handle, entry->metaBuffer,
- entry->header.header.metadataSize, &entry->header);
- }
- *header = &entry->header;
- }
- celixThreadRwlock_unlock(&handle->dbLock);
- return result;
-}
int pubsub_tcpHandler_addMessageHandler(pubsub_tcpHandler_t *handle, void *payload,
pubsub_tcpHandler_processMessage_callback_t processMessageCallback) {
@@ -950,6 +976,44 @@ int pubsub_tcpHandler_addAcceptConnectionCallback(pubsub_tcpHandler_t *handle, v
return result;
}
+static inline
+int pubsub_tcpHandler_writeSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, struct msghdr* msg, unsigned int size, int flag ) {
+ int nbytes = 0;
+ int msgSize = 0;
+ if (entry->fd >= 0 && size && msg->msg_iovlen) {
+ int expectedReadSize = size;
+ unsigned int offset = 0;
+ nbytes = size;
+ while (nbytes > 0 && expectedReadSize > 0) {
+ // Read the message header
+ nbytes = sendmsg(entry->fd, msg, flag | MSG_NOSIGNAL);
+ // Update admin
+ expectedReadSize -= nbytes;
+ msgSize += nbytes;
+ // Not all written
+ if (expectedReadSize && nbytes > 0) {
+ unsigned int readSize = 0;
+ unsigned int readIndex = 0;
+ unsigned int i = 0;
+ for (i = 0; i < msg->msg_iovlen; i++) {
+ if (nbytes < msg->msg_iov[i].iov_len) {
+ readIndex = i;
+ break;
+ }
+ readSize+= msg->msg_iov[i].iov_len;
+ }
+ msg->msg_iov = &msg->msg_iov[readIndex];
+ msg->msg_iovlen -= readIndex;
+ char* buffer = (char*)msg->msg_iov->iov_base;
+ offset = nbytes - readSize;
+ msg->msg_iov->iov_base = &buffer[offset];
+ msg->msg_iov->iov_len = msg->msg_iov->iov_len - offset;
+ }
+ }
+ }
+ if (nbytes <=0) msgSize = nbytes;
+ return msgSize;
+}
//
// Write large data to TCP. .
//
@@ -963,6 +1027,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
hash_map_iterator_t iter = hashMapIterator_construct(handle->connection_fd_map);
while (hashMapIterator_hasNext(&iter)) {
psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (!entry->connected) continue;
void *payloadData = NULL;
size_t payloadSize = 0;
if (msg_iov_len == 1) {
@@ -973,6 +1038,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
}
}
+ message->header.seqNr = entry->seqNr;
message->header.payloadSize = payloadSize;
message->header.payloadPartSize = payloadSize;
message->header.payloadOffset = 0;
@@ -987,15 +1053,12 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
message->header.metadataSize = metadataSize;
size_t msgSize = 0;
- long int nbytes = 0;
struct msghdr msg;
struct iovec msg_iov[IOV_MAX];
+ memset(&msg, 0x00, sizeof(struct msghdr));
msg.msg_name = &entry->addr;
msg.msg_namelen = entry->len;
msg.msg_flags = flags;
- msg.msg_control = NULL;
- msg.msg_controllen = 0;
- msg.msg_iovlen = 0;
msg.msg_iov = msg_iov;
// Write generic seralized payload in vector buffer
@@ -1044,10 +1107,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
L_ERROR("[TCP Socket] No header buffer is generated");
msg.msg_iovlen = 0;
}
- nbytes = 0;
- if (entry->fd >= 0 && msgSize && msg.msg_iovlen) {
- nbytes = sendmsg(entry->fd, &msg, flags | MSG_NOSIGNAL);
- }
+ long int nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgSize, flags);
// When a specific socket keeps reporting errors can indicate a subscriber
// which is not active anymore, the connection will remain until the retry
// counter exceeds the maximum retry count.
@@ -1068,7 +1128,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
} else if (msgSize) {
entry->retryCount = 0;
if (nbytes != msgSize) {
- L_ERROR("[TCP Socket] MsgSize not correct: %d != %d\n", msgSize, nbytes);
+ L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", entry->seqNr, msgSize, nbytes, strerror(errno));
}
}
// Release data
@@ -1082,6 +1142,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
if (metadataData) {
free(metadataData);
}
+ entry->seqNr++;
}
}
celixThreadRwlock_unlock(&handle->dbLock);
@@ -1168,51 +1229,10 @@ int pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t *handle, psa_tcp_connect
}
//
-// Handle sockets reads (blocking)
-//
-static inline
-void pubsub_tcpHandler_readHandler(pubsub_tcpHandler_t *handle, int fd) {
- unsigned int index = 0;
- bool readMsg = false;
- int rc = pubsub_tcpHandler_dataAvailable(handle, fd, &index, &readMsg);
- if (rc <= 0) {
- // close connection.
- if (rc == 0)
- pubsub_tcpHandler_close(handle, fd);
- return;
- }
- if (readMsg) {
- // Handle data
- pubsub_protocol_message_t *header = NULL;
- rc = pubsub_tcpHandler_read(handle, fd, index, &header);
- if (rc < 0)
- return;
- celixThreadRwlock_readLock(&handle->dbLock);
- if (handle->processMessageCallback && header != NULL && header->payload.payload != NULL &&
- header->payload.length) {
- struct timespec receiveTime;
- clock_gettime(CLOCK_REALTIME, &receiveTime);
- bool releaseEntryBuffer = false;
- handle->processMessageCallback(handle->processMessagePayload, header, &releaseEntryBuffer, &receiveTime);
- if (releaseEntryBuffer)
- pubsub_tcpHandler_releaseEntryBuffer(handle, fd, index);
- }
- celixThreadRwlock_unlock(&handle->dbLock);
- }
-}
-
-//
// Handle sockets connection (sender)
//
static inline
void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd) {
- int err = 0;
- socklen_t len = sizeof(int);
- int rc = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
- if (rc != 0) {
- L_ERROR("[TCP Socket]:EPOLLOUT ERROR read from socket %s\n", strerror(errno));
- return;
- }
celixThreadRwlock_readLock(&handle->dbLock);
psa_tcp_connection_entry_t *entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd);
if (entry)
@@ -1255,7 +1275,8 @@ void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
int fd = pubsub_tcpHandler_acceptHandler(handle, pendingConnectionEntry);
pubsub_tcpHandler_connectionHandler(handle, fd);
} else if (events[i].filter & EVFILT_READ) {
- pubsub_tcpHandler_readHandler(handle, events[i].ident);
+ int rc = pubsub_tcpHandler_read(handle, events[i].data.fd);
+ if (rc == 0) pubsub_tcpHandler_close(handle, events[i].data.fd);
} else if (events[i].flags & EV_EOF) {
int err = 0;
socklen_t len = sizeof(int);
@@ -1304,7 +1325,8 @@ void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
int fd = pubsub_tcpHandler_acceptHandler(handle, pendingConnectionEntry);
pubsub_tcpHandler_connectionHandler(handle, fd);
} else if (events[i].events & EPOLLIN) {
- pubsub_tcpHandler_readHandler(handle, events[i].data.fd);
+ rc = pubsub_tcpHandler_read(handle, events[i].data.fd);
+ if (rc == 0) pubsub_tcpHandler_close(handle, events[i].data.fd);
} else if (events[i].events & EPOLLRDHUP) {
int err = 0;
socklen_t len = sizeof(int);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
index 260edc1..ed4581c 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
@@ -68,8 +68,7 @@ void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, unsigned
void pubsub_tcpHandler_setSendTimeOut(pubsub_tcpHandler_t *handle, double timeout);
void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double timeout);
-int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigned int *index, bool *readMsg);
-int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd, unsigned int index, pubsub_protocol_message_t **header);
+int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd);
int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle,
pubsub_protocol_message_t *message,
struct iovec *msg_iovec,
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index 8cbf8fc..0bd51c5 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -157,6 +157,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
receiver->protocol = protocol;
receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
receiver->topic = strndup(topic, 1024 * 1024);
+ bool isEndpoint = false;
bool isServerEndPoint = false;
/* Check if it's a static endpoint */
@@ -167,6 +168,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
staticConnectUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL);
const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
if (endPointType != NULL) {
+ isEndpoint = true;
if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType,
strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) {
staticClientEndPointUrls = staticConnectUrls;
@@ -207,8 +209,9 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
const char *sched = celix_properties_get(topicProperties, PUBSUB_TCP_THREAD_REALTIME_SCHED, NULL);
long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_KEY,
PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_DEFAULT);
- double rcvTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY,
- PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT);
+ double rcvTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY,
+ (!isEndpoint) ? PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT :
+ PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_ENDPOINT_DEFAULT);
long sessions = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_MAX_RECV_SESSIONS,
PSA_TCP_DEFAULT_MAX_RECV_SESSIONS);
long buffer_size = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE,
@@ -789,4 +792,4 @@ static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t
}
return check;
-}
\ No newline at end of file
+}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index 69c862a..47dc888 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -145,6 +145,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
}
sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
PSA_TCP_DEFAULT_METRICS_ENABLED);
+ bool isEndpoint = false;
char *urls = NULL;
const char *ip = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_IP_KEY, NULL);
const char *discUrl = NULL;
@@ -155,6 +156,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
/* Check if it's a static endpoint */
const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
if (endPointType != NULL) {
+ isEndpoint = true;
if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType,
strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) {
staticClientEndPointUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL);
@@ -192,7 +194,8 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY,
PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
double timeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY,
- PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT);
+ (!isEndpoint) ? PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT :
+ PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT);
pubsub_tcpHandler_setThreadName(sender->socketHandler, topic, scope);
pubsub_tcpHandler_setThreadPriority(sender->socketHandler, prio, sched);
pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);