You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rb...@apache.org on 2020/10/06 19:04:46 UTC
[celix] 02/11: Update for non blocking
This is an automated email from the ASF dual-hosted git repository.
rbulter pushed a commit to branch feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v2
in repository https://gitbox.apache.org/repos/asf/celix.git
commit 3eb8877ca9b20d8867cfc5f19fe3c17535591d82
Author: Roy Bulter <ro...@gmail.com>
AuthorDate: Sun Jun 28 08:06:27 2020 +0200
Update for non blocking
---
.../pubsub_admin_tcp/src/pubsub_tcp_handler.c | 88 +++++++++++++---------
1 file changed, 52 insertions(+), 36 deletions(-)
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 9ef6361..19cb0fd 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -53,6 +53,12 @@
#define MAX_EVENTS 64
#define MAX_DEFAULT_BUFFER_SIZE 4u
+#define READ_STATE_INIT 0u
+#define READ_STATE_HEADER 1u
+#define READ_STATE_DATA 2u
+#define READ_STATE_READY 3u
+#define READ_STATE_FIND_HEADER 4u
+
#if defined(__APPLE__)
#define MSG_NOSIGNAL (0)
#endif
@@ -77,6 +83,7 @@ typedef struct psa_tcp_connection_entry {
socklen_t len;
bool connected;
bool headerError;
+ unsigned int state;
pubsub_protocol_message_t header;
unsigned int maxMsgSize;
unsigned int syncSize;
@@ -850,45 +857,54 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
entry->bufferSize = handle->bufferSize;
}
// Read the message
+ long int nbytes = 0;
bool validMsg = false;
char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer;
- int nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, MSG_PEEK);
- if (nbytes > 0) {
- // Check header message buffer
- if (handle->protocol->decodeHeader(handle->protocol->handle, header_buffer, entry->headerSize, &entry->header) != CELIX_SUCCESS) {
- // Did not receive correct header
- // skip sync word and try to read next header
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize, 0);
- if (!entry->headerError) {
- L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url);
- }
- entry->headerError = true;
- entry->bufferReadSize = 0;
- } else {
- // Read header message from queue
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, 0);
- if ((nbytes > 0) && (nbytes == entry->headerSize)) {
- entry->headerError = false;
+ if (entry->state == READ_STATE_HEADER) {
+ nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, MSG_PEEK);
+ if (nbytes >= entry->headerSize) { // Check header message buffer
+ if (handle->protocol->decodeHeader(handle->protocol->handle, header_buffer, entry->headerSize, &entry->header) != CELIX_SUCCESS) {
+ // Did not receive correct header
+ // skip sync word and try to read next header
+ nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize, 0);
+ if (!entry->headerError) {
+ L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url);
+ }
+ entry->headerError = true;
+ entry->bufferReadSize = 0;
+ } else {
+ // Read header message from queue
+ nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, 0);
+ if ((nbytes > 0) && (nbytes == entry->headerSize)) {
+ entry->headerError = false;
+ entry->state == READ_STATE_DATA;
// For headerless message, add header to bufferReadSize;
- if (!entry->headerBufferSize)
- entry->bufferReadSize += nbytes;
- // Alloc message buffers
- if (entry->header.header.payloadSize > entry->bufferSize) {
- handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize);
- if (entry->buffer)
- free(entry->buffer);
- entry->buffer = malloc((size_t) handle->bufferSize);
- entry->bufferSize = handle->bufferSize;
- }
- if (entry->header.header.metadataSize > entry->metaBufferSize) {
- if (entry->metaBuffer) {
- free(entry->metaBuffer);
- entry->metaBuffer = malloc((size_t) entry->header.header.metadataSize);
- entry->bufferSize = handle->bufferSize;
- L_WARN("[TCP Socket] socket: %d, url: %s, realloc read meta buffer: (%d, %d) \n", entry->fd,
- entry->url, entry->metaBufferSize, entry->header.header.metadataSize);
- }
- }
+ if (!entry->headerBufferSize) entry->bufferReadSize += nbytes;
+ }
+ }
+ }
+ }
+
+ if (nentry->state == READ_STATE_DATA) {
+
+ // Alloc message buffers
+ if (entry->header.header.payloadSize > entry->bufferSize) {
+ handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize);
+ if (entry->buffer)
+ free(entry->buffer);
+ entry->buffer = malloc((size_t) handle->bufferSize);
+ entry->bufferSize = handle->bufferSize;
+ }
+
+ if (entry->header.header.metadataSize > entry->metaBufferSize) {
+ if (entry->metaBuffer) {
+ free(entry->metaBuffer);
+ entry->metaBuffer = malloc((size_t) entry->header.header.metadataSize);
+ entry->metaBufferSize = entry->header.header.metadataSize;
+ L_WARN("[TCP Socket] socket: %d, url: %s, realloc read meta buffer: (%d, %d) \n", entry->fd,
+ entry->url, entry->metaBufferSize, entry->header.header.metadataSize);
+ }
+ }
if (entry->header.header.payloadSize) {
unsigned int offset = entry->header.header.payloadOffset;