You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rb...@apache.org on 2020/10/06 19:04:47 UTC
[celix] 03/11: Fix unit test
This is an automated email from the ASF dual-hosted git repository.
rbulter pushed a commit to branch feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v2
in repository https://gitbox.apache.org/repos/asf/celix.git
commit bec22541abb5f012ec8bf3ed2a3f55d8d6c92096
Author: Roy Bulter <ro...@gmail.com>
AuthorDate: Sun Jun 28 20:24:02 2020 +0200
Fix unit test
---
.../src/pubsub_psa_tcp_constants.h | 2 +-
.../pubsub_admin_tcp/src/pubsub_tcp_handler.c | 275 +++++++++++++--------
2 files changed, 174 insertions(+), 103 deletions(-)
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
index 302c9f6..ff8e68f 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
@@ -60,7 +60,7 @@
//Time-out settings are only for BLOCKING connections
#define PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY "PUBSUB_TCP_PUBLISHER_SEND_TIMEOUT"
-#define PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT 5.0
+#define PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT 0.0 //5.0
#define PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT 0.0
#define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY "PUBSUB_TCP_SUBSCRIBER_RCV_TIMEOUT"
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 19cb0fd..b404ec4 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -53,11 +53,12 @@
#define MAX_EVENTS 64
#define MAX_DEFAULT_BUFFER_SIZE 4u
-#define READ_STATE_INIT 0u
-#define READ_STATE_HEADER 1u
-#define READ_STATE_DATA 2u
-#define READ_STATE_READY 3u
-#define READ_STATE_FIND_HEADER 4u
+#define READ_STATE_HEADER 0u
+#define READ_STATE_PAYLOAD 1u
+#define READ_STATE_META 2u
+#define READ_STATE_FOOTER 3u
+#define READ_STATE_READY 4u
+#define READ_STATE_SYNC 5u
#if defined(__APPLE__)
#define MSG_NOSIGNAL (0)
@@ -94,7 +95,9 @@ typedef struct psa_tcp_connection_entry {
void *footerBuffer;
unsigned int bufferSize;
void *buffer;
- unsigned int bufferReadSize;
+ unsigned int bufferReadReadOffset;
+ unsigned int expectedBufferReadSize;
+ unsigned int msgSizeReadSize;
unsigned int metaBufferSize;
void *metaBuffer;
struct msghdr msg;
@@ -134,29 +137,18 @@ struct pubsub_tcpHandler {
bool running;
};
-static inline int
-pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock);
-
+static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock);
static inline int pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry);
-
static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int fd);
-
-static inline psa_tcp_connection_entry_t *
-pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, char *external_url,
- struct sockaddr_in *addr);
-
+static inline psa_tcp_connection_entry_t* pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, char *external_url, struct sockaddr_in *addr);
static inline void pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry);
-
static inline void pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *handle, int fd, unsigned int index);
-
-static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* buffer, unsigned int offset, unsigned int size, int flag );
-
+static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* buffer, int flag );
+static inline void pubsub_tcpHandler_setReadStateMachine(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry);
+static inline void pubsub_tcpHandler_setNextStateReadStateMachine(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int nextState);
static inline void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry);
-
static inline void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd);
-
static inline void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle);
-
static void *pubsub_tcpHandler_thread(void *data);
//
@@ -357,6 +349,7 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->buffer;
entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->bufferSize;
entry->msg_iovlen++;
+ pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER);
}
return entry;
}
@@ -469,6 +462,13 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
L_ERROR("[TCP Socket] Cannot create poll event %s\n", strerror(errno));
entry = NULL;
}
+ //rc = pubsub_tcpHandler_makeNonBlocking(handle, entry->fd);
+ //if (rc < 0) {
+ // pubsub_tcpHandler_freeEntry(entry);
+ // L_ERROR("[TCP Socket] Cannot make not blocking %s\n", strerror(errno));
+ // entry = NULL;
+ // }
+
}
if ((rc >= 0) && (entry)) {
celixThreadRwlock_writeLock(&handle->dbLock);
@@ -576,7 +576,7 @@ static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle,
else {
rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
if (rc < 0) {
- L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING epoll: %s\n", strerror(errno));
+ L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING: %s\n", strerror(errno));
}
}
return rc;
@@ -791,21 +791,20 @@ void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double tim
}
static inline
-int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* _buffer, unsigned int offset, unsigned int size, int flag ) {
- int expectedReadSize = size;
- int nbytes = size;
- int msgSize = 0;
+int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* _buffer, int flag ) {
+ int nbytes = entry->expectedBufferReadSize;
char* buffer = (char*)_buffer;
- while (nbytes > 0 && expectedReadSize > 0) {
+ while (nbytes > 0 && entry->expectedBufferReadSize > 0) {
// Read the message header
- nbytes = recv(fd, &buffer[offset], expectedReadSize, flag | MSG_NOSIGNAL);
+ nbytes = recv(fd, &buffer[entry->bufferReadReadOffset], entry->expectedBufferReadSize, flag | MSG_NOSIGNAL);
// Update buffer administration
- offset += nbytes;
- expectedReadSize -= nbytes;
- msgSize += nbytes;
+ entry->bufferReadReadOffset += nbytes;
+ entry->expectedBufferReadSize-= nbytes;
}
- if (nbytes <=0) msgSize = nbytes;
- return msgSize;
+ if (nbytes == 0) {
+ L_WARN("[TCP Socket] Failed to receive message (fd: %d), error: %s. Retry count %u of %u,", entry->fd, strerror(errno));
+ }
+ return nbytes;
}
@@ -828,6 +827,88 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec
}
}
+ static inline
+ void pubsub_tcpHandler_setReadStateMachine(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry) {
+ entry->bufferReadReadOffset = 0;
+ if (entry->state == READ_STATE_SYNC) {
+ entry->expectedBufferReadSize = entry->headerSize;
+ entry->state = READ_STATE_HEADER;
+ } else if (entry->state == READ_STATE_HEADER) {
+ if (entry->header.header.payloadSize) {
+ entry->state = READ_STATE_PAYLOAD;
+ entry->bufferReadReadOffset = entry->header.header.payloadOffset;
+ entry->expectedBufferReadSize = entry->header.header.payloadSize;
+ // For header less messages adjust offset and msg size;
+ if (!entry->headerBufferSize) {
+ entry->bufferReadReadOffset += entry->headerSize;
+ entry->expectedBufferReadSize -= entry->headerSize;
+ }
+ } else if (entry->header.header.metadataSize) {
+ entry->state = READ_STATE_META;
+ entry->expectedBufferReadSize = entry->header.header.metadataSize;
+ } else if (!entry->header.header.payloadSize && !entry->header.header.metadataSize) {
+ if (entry->footerSize) {
+ entry->state = READ_STATE_FOOTER;
+ entry->expectedBufferReadSize = entry->footerSize;
+ } else if (entry->header.header.isLastSegment) {
+ entry->state = READ_STATE_READY;
+ entry->expectedBufferReadSize = 0;
+ } else {
+ entry->state = READ_STATE_HEADER;
+ entry->expectedBufferReadSize = entry->headerSize;
+ }
+ }
+ } else if (entry->state == READ_STATE_PAYLOAD) {
+ if (entry->header.header.metadataSize) {
+ entry->state = READ_STATE_META;
+ entry->expectedBufferReadSize = entry->header.header.metadataSize;
+ } else {
+ if (entry->footerSize) {
+ entry->state = READ_STATE_FOOTER;
+ entry->expectedBufferReadSize = entry->footerSize;
+ } else if (entry->header.header.isLastSegment) {
+ entry->state = READ_STATE_READY;
+ entry->expectedBufferReadSize = 0;
+ } else {
+ entry->state = READ_STATE_HEADER;
+ entry->expectedBufferReadSize = entry->headerSize;
+ }
+ }
+ } else if (entry->state == READ_STATE_META) {
+ if (entry->footerSize) {
+ entry->state = READ_STATE_FOOTER;
+ entry->expectedBufferReadSize = entry->footerSize;
+ } else if (entry->header.header.isLastSegment) {
+ entry->state = READ_STATE_READY;
+ entry->expectedBufferReadSize = 0;
+ } else {
+ entry->state = READ_STATE_HEADER;
+ entry->expectedBufferReadSize = entry->headerSize;
+ }
+ } else if (entry->state == READ_STATE_FOOTER) {
+ if (entry->header.header.isLastSegment) {
+ entry->state = READ_STATE_READY;
+ } else {
+ entry->state = READ_STATE_HEADER;
+ entry->expectedBufferReadSize = entry->headerSize;
+ }
+ } else if (entry->state == READ_STATE_READY) {
+ entry->state = READ_STATE_HEADER;
+ entry->expectedBufferReadSize = entry->headerSize;
+ }
+}
+static inline void pubsub_tcpHandler_setNextStateReadStateMachine(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int nextState){
+ entry->bufferReadReadOffset = 0;
+ if (nextState == READ_STATE_SYNC) {
+ entry->expectedBufferReadSize = entry->syncSize;
+ entry->state = nextState;
+ } else if (nextState == READ_STATE_HEADER) {
+ entry->expectedBufferReadSize = entry->headerSize;
+ entry->state = nextState;
+ }
+}
+
+
//
// Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure
@@ -858,35 +939,40 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
}
// Read the message
long int nbytes = 0;
- bool validMsg = false;
char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer;
+ if (entry->state == READ_STATE_SYNC) {
+ nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0);
+ if (nbytes > 0) {
+ pubsub_tcpHandler_setReadStateMachine(handle, entry);
+ }
+ }
if (entry->state == READ_STATE_HEADER) {
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, MSG_PEEK);
+ nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, MSG_PEEK);
if (nbytes >= entry->headerSize) { // Check header message buffer
if (handle->protocol->decodeHeader(handle->protocol->handle, header_buffer, entry->headerSize, &entry->header) != CELIX_SUCCESS) {
// Did not receive correct header
// skip sync word and try to read next header
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize, 0);
if (!entry->headerError) {
L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url);
}
entry->headerError = true;
- entry->bufferReadSize = 0;
+ pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_SYNC);
} else {
// Read header message from queue
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, 0);
+ pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER);
+ nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0);
if ((nbytes > 0) && (nbytes == entry->headerSize)) {
entry->headerError = false;
- entry->state == READ_STATE_DATA;
- // For headerless message, add header to bufferReadSize;
- if (!entry->headerBufferSize) entry->bufferReadSize += nbytes;
+ entry->msgSizeReadSize = 0;
+ // For headerless message, add header to bufferReadSize;
+ if (!entry->headerBufferSize) entry->msgSizeReadSize += nbytes;
+ pubsub_tcpHandler_setReadStateMachine(handle, entry);
}
}
}
}
- if (nentry->state == READ_STATE_DATA) {
-
+ if (entry->state == READ_STATE_PAYLOAD) {
// Alloc message buffers
if (entry->header.header.payloadSize > entry->bufferSize) {
handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize);
@@ -896,6 +982,17 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
entry->bufferSize = handle->bufferSize;
}
+ // Read payload data from queue
+ nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, 0);
+ if (nbytes > 0) {
+ if (nbytes >= entry->header.header.payloadPartSize) {
+ entry->msgSizeReadSize += nbytes;
+ pubsub_tcpHandler_setReadStateMachine(handle, entry);
+ }
+ }
+ }
+
+ if (entry->state == READ_STATE_META) {
if (entry->header.header.metadataSize > entry->metaBufferSize) {
if (entry->metaBuffer) {
free(entry->metaBuffer);
@@ -906,64 +1003,38 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
}
}
- if (entry->header.header.payloadSize) {
- unsigned int offset = entry->header.header.payloadOffset;
- unsigned int size = entry->header.header.payloadPartSize;
- // For header less messages adjust offset and msg size;
- if (!entry->headerBufferSize) {
- offset += entry->headerSize;
- size -= offset;
- }
- // Read payload data from queue
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, offset, size, 0);
- if (nbytes > 0) {
- if (nbytes == size) {
- entry->bufferReadSize += nbytes;
- } else {
- entry->bufferReadSize = 0;
- L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
- }
- }
- }
- if (nbytes > 0 && entry->header.header.metadataSize) {
- // Read meta data from queue
- unsigned int size = entry->header.header.metadataSize;
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0, size,0);
- if ((nbytes > 0) && (nbytes != size)) {
- L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
- }
- }
- // Check for end of message using, footer of message. Because of streaming protocol
- if (nbytes > 0) {
- if (entry->footerSize > 0) {
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer,0, entry->footerSize,0);
- if (handle->protocol->decodeFooter(handle->protocol->handle, entry->footerBuffer, entry->footerSize, &entry->header) == CELIX_SUCCESS) {
- // valid footer, this means that the message is valid
- validMsg = true;
- } else {
- // Did not receive correct header
- L_ERROR("[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url);
- entry->bufferReadSize = 0;
- }
- } else {
- // No Footer, then complete message is received
- validMsg = true;
- }
- }
- }
+ // Read meta data from (queue
+ nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0);
+ if ((nbytes > 0) && (nbytes >= entry->header.header.metadataSize)) {
+ entry->msgSizeReadSize += nbytes;
+ pubsub_tcpHandler_setReadStateMachine(handle, entry);
}
}
+ if (entry->state == READ_STATE_FOOTER) {
+ // Check for end of message using, footer of message. Because of streaming protocol
+ nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer, 0);
+ if (handle->protocol->decodeFooter(handle->protocol->handle, entry->footerBuffer, entry->footerSize, &entry->header) == CELIX_SUCCESS) {
+ // valid footer, this means that the message is valid
+ pubsub_tcpHandler_setReadStateMachine(handle, entry);
+ } else {
+ // Did not receive correct header
+ L_ERROR("[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url);
+ pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER);
+ }
+ }
+ if (entry->state == READ_STATE_READY) {
+ // Complete message is received
+ pubsub_tcpHandler_decodePayload(handle, entry);
+ pubsub_tcpHandler_setReadStateMachine(handle, entry);
+ }
+
if (nbytes > 0) {
entry->retryCount = 0;
- // Check if complete message is received
- if ((entry->bufferReadSize >= entry->header.header.payloadSize) &&
- validMsg &&
- entry->header.header.isLastSegment) {
- entry->bufferReadSize = 0;
- pubsub_tcpHandler_decodePayload(handle, entry);
- }
- } else {
- if (entry->retryCount < handle->maxRcvRetryCount) {
+ } else if (nbytes < 0) {
+ if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
+ // Non blocking interrupt
+ entry->retryCount = 0;
+ } else if (entry->retryCount < handle->maxRcvRetryCount) {
entry->retryCount++;
L_WARN("[TCP Socket] Failed to receive message (fd: %d), error: %s. Retry count %u of %u,", entry->fd,
strerror(errno), entry->retryCount, handle->maxRcvRetryCount);
@@ -1143,7 +1214,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[msgIovLen + i].iov_len;
if ((msgPartSize + msg.msg_iov[msg.msg_iovlen].iov_len) > entry->maxMsgSize)
break;
- msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+ msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
}
message->header.payloadPartSize = msgPartSize;
message->header.payloadOffset = msgSize;
@@ -1171,7 +1242,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
msg.msg_iovlen++;
msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize;
- msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+ msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
}
void *headerData = NULL;
@@ -1196,7 +1267,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
L_ERROR("[TCP Socket] No header buffer is generated");
msg.msg_iovlen = 0;
}
- nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgSize, flags);
+ nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgPartSize, flags);
// When a specific socket keeps reporting errors can indicate a subscriber
// which is not active anymore, the connection will remain until the retry
// counter exceeds the maximum retry count.
@@ -1215,7 +1286,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
result = -1; //At least one connection failed sending
} else if (msgPartSize) {
entry->retryCount = 0;
- if (nbytes != msgSize) {
+ if (nbytes != msgPartSize) {
L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgSize, nbytes, strerror(errno));
}
}