You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rb...@apache.org on 2020/10/06 19:04:46 UTC

[celix] 02/11: Update for non blocking

This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a commit to branch feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v2
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 3eb8877ca9b20d8867cfc5f19fe3c17535591d82
Author: Roy Bulter <ro...@gmail.com>
AuthorDate: Sun Jun 28 08:06:27 2020 +0200

    Update for non blocking
---
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.c      | 88 +++++++++++++---------
 1 file changed, 52 insertions(+), 36 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 9ef6361..19cb0fd 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -53,6 +53,12 @@
 #define MAX_EVENTS   64
 #define MAX_DEFAULT_BUFFER_SIZE 4u
 
+#define READ_STATE_INIT   0u
+#define READ_STATE_HEADER 1u
+#define READ_STATE_DATA   2u
+#define READ_STATE_READY  3u
+#define READ_STATE_FIND_HEADER 4u
+
 #if defined(__APPLE__)
 #define MSG_NOSIGNAL (0)
 #endif
@@ -77,6 +83,7 @@ typedef struct psa_tcp_connection_entry {
     socklen_t len;
     bool connected;
     bool headerError;
+    unsigned int state;
     pubsub_protocol_message_t header;
     unsigned int maxMsgSize;
     unsigned int syncSize;
@@ -850,45 +857,54 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
             entry->bufferSize = handle->bufferSize;
         }
     // Read the message
+    long int nbytes = 0;
     bool validMsg = false;
     char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer;
-    int nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, MSG_PEEK);
-    if (nbytes > 0) {
-        // Check header message buffer
-        if (handle->protocol->decodeHeader(handle->protocol->handle, header_buffer, entry->headerSize, &entry->header) != CELIX_SUCCESS) {
-            // Did not receive correct header
-            // skip sync word and try to read next header
-            nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize, 0);
-            if (!entry->headerError) {
-                L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url);
-            }
-            entry->headerError = true;
-            entry->bufferReadSize = 0;
-        } else {
-            // Read header message from queue
-            nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, 0);
-            if ((nbytes > 0) && (nbytes == entry->headerSize)) {
-                entry->headerError = false;
+    if (entry->state == READ_STATE_HEADER) {
+      nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, MSG_PEEK);
+      if (nbytes >= entry->headerSize) { // Check header message buffer
+          if (handle->protocol->decodeHeader(handle->protocol->handle, header_buffer, entry->headerSize, &entry->header) != CELIX_SUCCESS) {
+              // Did not receive correct header
+              // skip sync word and try to read next header
+              nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize, 0);
+              if (!entry->headerError) {
+                  L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url);
+              }
+              entry->headerError = true;
+              entry->bufferReadSize = 0;
+          } else {
+              // Read header message from queue
+              nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, 0);
+              if ((nbytes > 0) && (nbytes == entry->headerSize)) {
+                 entry->headerError = false;
+                 entry->state == READ_STATE_DATA;
                 // For headerless message, add header to bufferReadSize;
-                if (!entry->headerBufferSize)
-                    entry->bufferReadSize += nbytes;
-                // Alloc message buffers
-                if (entry->header.header.payloadSize > entry->bufferSize) {
-                    handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize);
-                    if (entry->buffer)
-                        free(entry->buffer);
-                    entry->buffer = malloc((size_t) handle->bufferSize);
-                    entry->bufferSize = handle->bufferSize;
-                }
-                if (entry->header.header.metadataSize > entry->metaBufferSize) {
-                    if (entry->metaBuffer) {
-                        free(entry->metaBuffer);
-                        entry->metaBuffer = malloc((size_t) entry->header.header.metadataSize);
-                        entry->bufferSize = handle->bufferSize;
-                        L_WARN("[TCP Socket] socket: %d, url: %s,  realloc read meta buffer: (%d, %d) \n", entry->fd,
-                               entry->url, entry->metaBufferSize, entry->header.header.metadataSize);
-                    }
-                }
+                if (!entry->headerBufferSize) entry->bufferReadSize += nbytes;
+              }
+          }
+      }
+    }
+
+    if (nentry->state == READ_STATE_DATA) {
+
+        // Alloc message buffers
+        if (entry->header.header.payloadSize > entry->bufferSize) {
+            handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize);
+            if (entry->buffer)
+                free(entry->buffer);
+            entry->buffer = malloc((size_t) handle->bufferSize);
+            entry->bufferSize = handle->bufferSize;
+        }
+
+        if (entry->header.header.metadataSize > entry->metaBufferSize) {
+            if (entry->metaBuffer) {
+                free(entry->metaBuffer);
+                entry->metaBuffer     = malloc((size_t) entry->header.header.metadataSize);
+                entry->metaBufferSize = entry->header.header.metadataSize;
+                L_WARN("[TCP Socket] socket: %d, url: %s,  realloc read meta buffer: (%d, %d) \n", entry->fd,
+                       entry->url, entry->metaBufferSize, entry->header.header.metadataSize);
+            }
+        }
 
                 if (entry->header.header.payloadSize) {
                     unsigned int offset = entry->header.header.payloadOffset;