You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rb...@apache.org on 2020/05/31 12:49:41 UTC

[celix] branch feature/refactor_tcp_receive_function created (now 0255847)

This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a change to branch feature/refactor_tcp_receive_function
in repository https://gitbox.apache.org/repos/asf/celix.git.


      at 0255847  Fix crashes when receiving big messages (500k)

This branch includes the following new commits:

     new 0255847  Fix crashes when receiving big messages (500k)

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[celix] 01/01: Fix crashes when receiving big messages (500k)

Posted by rb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a commit to branch feature/refactor_tcp_receive_function
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 02558474957863e840e15d026a879578a6c7d41b
Author: Roy Bulter <ro...@gmail.com>
AuthorDate: Sun May 31 14:49:15 2020 +0200

    Fix crashes when receiving big messages (500k)
---
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.c      | 188 ++++++++++++---------
 libs/framework/src/celix_log.c                     |   1 +
 2 files changed, 110 insertions(+), 79 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 93f0358..92ef423 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -83,6 +83,7 @@ typedef struct psa_tcp_connection_entry {
     void *headerBuffer;
     unsigned int bufferSize;
     void *buffer;
+    unsigned int bufferReadSize;
     unsigned int metaBufferSize;
     void *metaBuffer;
     struct msghdr msg;
@@ -128,6 +129,7 @@ pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_conn
 static inline int pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry);
 
 static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int fd);
+static inline int pubsub_tcpHandler_makeBlocking(pubsub_tcpHandler_t *handle, int fd);
 
 static inline psa_tcp_connection_entry_t *
 pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, char *external_url,
@@ -139,6 +141,8 @@ static inline void pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *han
 
 static inline void pubsub_tcpHandler_readHandler(pubsub_tcpHandler_t *handle, int fd);
 
+static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* buffer, unsigned int offset, unsigned int size, int flag );
+
 static inline void pubsub_tcpHandler_readHandler(pubsub_tcpHandler_t *handle, int fd);
 
 static inline void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd);
@@ -547,7 +551,7 @@ pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle,
 // Make accept file descriptor non blocking
 //
 static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle,
-                                                    int fd) {
+                                                 int fd) {
     int rc = 0;
     int flags = fcntl(fd, F_GETFL, 0);
     if (flags == -1)
@@ -562,6 +566,26 @@ static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle,
 }
 
 //
+// Make accept file descriptor non blocking
+//
+static inline int pubsub_tcpHandler_makeBlocking(pubsub_tcpHandler_t *handle,
+                                                    int fd) {
+    int rc = 0;
+    int flags = fcntl(fd, F_GETFL, 0);
+    if (flags == -1)
+        rc = flags;
+    else {
+        rc = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
+        if (rc < 0) {
+            L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING epoll: %s\n", strerror(errno));
+        }
+    }
+    return rc;
+}
+
+
+
+//
 // setup listening to interface (sender) using an url
 //
 int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url) {
@@ -759,6 +783,20 @@ void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double tim
     }
 }
 
+static inline
+int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* buffer, unsigned int offset, unsigned int size, int flag ) {
+    int expectedReadSize = size;
+    int nbytes = size;
+    while (nbytes > 0 && expectedReadSize > 0) {
+        // Read the message header
+        nbytes = recv(fd, &buffer[offset], expectedReadSize, flag | MSG_NOSIGNAL);
+        // Update buffer administration
+        offset += nbytes;
+        expectedReadSize -= nbytes;
+    }
+    return nbytes;
+}
+
 //
 // Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure
 // If the message is completely reassembled true is returned and the index and size have valid values
@@ -784,93 +822,84 @@ int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigne
     // Message buffer is to small, reallocate to make it bigger
     if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize)) {
         handle->bufferSize = MAX(handle->bufferSize, entry->headerSize);
-        char *buffer = realloc(entry->buffer, (size_t) handle->bufferSize);
-        if (buffer) {
-            entry->buffer = buffer;
-            entry->bufferSize = handle->bufferSize;
-        }
+        if (entry->buffer) free(entry->buffer);
+        entry->buffer = malloc((size_t) handle->bufferSize);
+        entry->bufferSize = handle->bufferSize;
     }
 
     // Read the message
-    entry->msg.msg_iovlen = 0;
-    entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = (entry->headerBufferSize) ? entry->headerBuffer
-                                                                                   : entry->buffer;
-    entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->headerSize;
-    entry->msg.msg_iovlen++;
-    int nbytes = recvmsg(fd, &entry->msg, MSG_PEEK | MSG_NOSIGNAL);
+    char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer;
+    int nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, MSG_PEEK);
     if (nbytes > 0) {
-        entry->msg.msg_iovlen = 0;
-        if (entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len != nbytes) {
-            celixThreadRwlock_unlock(&handle->dbLock);
-            return nbytes;
-
-        } else if (handle->protocol->decodeHeader(handle->protocol->handle,
-                                                  entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base,
-                                                  entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len, &entry->header) !=
-            CELIX_SUCCESS) {
-            entry->msg.msg_iov[0].iov_len = entry->syncSize;
-            nbytes = recvmsg(fd, &entry->msg, 0);
-            if (nbytes > 0)
-                entry->retryCount = 0;
-            celixThreadRwlock_unlock(&handle->dbLock);
-            return nbytes;
-        }
-        if (entry->header.header.payloadSize > entry->bufferSize) {
-            handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize);
-            char *buffer = realloc(entry->buffer, (size_t) handle->bufferSize);
-            if (buffer) {
-                entry->buffer = buffer;
-                entry->bufferSize = handle->bufferSize;
-            }
-        }
-        if (entry->header.header.metadataSize > entry->metaBufferSize) {
-            char *buffer = realloc(entry->metaBuffer, (size_t) entry->header.header.metadataSize);
-            if (buffer) {
-                entry->metaBuffer = buffer;
-                entry->metaBufferSize = entry->header.header.metadataSize;
-                L_WARN("[TCP Socket] socket: %d, url: %s,  realloc read meta buffer: (%d, %d) \n", entry->fd,
-                       entry->url, entry->metaBufferSize, entry->header.header.metadataSize);
-            }
-        }
-
-        if (entry->headerBufferSize)
-            entry->msg.msg_iovlen++;
-        if (entry->header.header.payloadSize) {
-            entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->buffer;
-            entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->header.header.payloadSize;
-            entry->msg.msg_iovlen++;
-        }
-        if (entry->header.header.metadataSize) {
-            entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->metaBuffer;
-            entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->header.header.metadataSize;
-            entry->msg.msg_iovlen++;
-        }
-        nbytes = recvmsg(fd, &entry->msg, MSG_WAITALL | MSG_NOSIGNAL);
-    } else {
-        if (entry->retryCount < handle->maxRcvRetryCount) {
-            entry->retryCount++;
-            L_WARN("[TCP Socket] Failed to receive message header (fd: %d), error: %s. Retry count %u of %u,",
-                   entry->fd, strerror(errno), entry->retryCount, handle->maxRcvRetryCount);
+        // Check header message buffer
+        if (handle->protocol->decodeHeader(handle->protocol->handle,
+                                           header_buffer,
+                                           entry->headerSize,
+                                           &entry->header) != CELIX_SUCCESS) {
+            // Did not receive correct header
+            // skip sync word and try to read next header
+            nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize, 0);
+            L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url);
+            entry->bufferReadSize = 0;
         } else {
-            L_ERROR(
-                "[TCP Socket] Failed to receive message header (fd: %d) after %u retries! Closing connection... Error: %s",
-                entry->fd,
-                handle->maxRcvRetryCount,
-                strerror(errno));
-            nbytes = 0; //Return 0 as indicator to close the connection
+            // Read header message from queue
+            nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, 0);
+            if ((nbytes > 0) && (nbytes == entry->headerSize)) {
+                // For headerless message, add header to bufferReadSize;
+                if (!entry->headerBufferSize)
+                    entry->bufferReadSize += nbytes;
+                // Alloc message buffers
+                if (entry->header.header.payloadSize > entry->bufferSize) {
+                    handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize);
+                    if (entry->buffer)
+                        free(entry->buffer);
+                    entry->buffer = malloc((size_t) handle->bufferSize);
+                    entry->bufferSize = handle->bufferSize;
+                }
+                if (entry->header.header.metadataSize > entry->metaBufferSize) {
+                    if (entry->metaBuffer)
+                        free(entry->metaBuffer);
+                    entry->metaBuffer = malloc((size_t) entry->header.header.metadataSize);
+                    entry->bufferSize = handle->bufferSize;
+                    L_WARN("[TCP Socket] socket: %d, url: %s,  realloc read meta buffer: (%d, %d) \n", entry->fd,
+                           entry->url, entry->metaBufferSize, entry->header.header.metadataSize);
+                }
+                if (entry->header.header.payloadSize) {
+                    unsigned int offset = entry->header.header.payloadOffset;
+                    unsigned int size = entry->header.header.payloadPartSize;
+                    // For header less messages adjust offset and msg size;
+                    if (!entry->headerBufferSize) {
+                        offset = entry->headerSize;
+                        size -= offset;
+                    }
+                    // Read payload data from queue
+                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, offset, size, 0);
+                    if (nbytes > 0) {
+                        if (nbytes == size) {
+                            entry->bufferReadSize += nbytes;
+                        } else {
+                            entry->bufferReadSize = 0;
+                            L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
+                        }
+                    }
+                }
+                if (nbytes > 0 && entry->header.header.metadataSize) {
+                    // Read meta data from queue
+                    unsigned int size = entry->header.header.metadataSize;
+                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0, size,0);
+                    if ((nbytes > 0) && (nbytes != size)) {
+                        L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
+                    }
+                }
+            }
         }
     }
     if (nbytes > 0) {
         entry->retryCount = 0;
-        unsigned int msgSize = 0;
-        for (int i = 0; i < entry->msg.msg_iovlen; i++) {
-            msgSize += entry->msg.msg_iov[i].iov_len;
-        }
-        if (nbytes == msgSize) {
+        // Check is complete message is received
+        if (entry->bufferReadSize >= entry->header.header.payloadSize) {
+            entry->bufferReadSize = 0;
             *readMsg = true;
-        } else {
-            L_ERROR("[TCP Socket] Failed to receive complete message (fd: %d) nbytes : %d = msgSize %d", entry->fd,
-                    nbytes, msgSize);
         }
     } else {
         if (entry->retryCount < handle->maxRcvRetryCount) {
@@ -1132,6 +1161,7 @@ int pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t *handle, psa_tcp_connect
     if (rc >= 0) {
         // handle new connection:
         struct sockaddr_in sin;
+        pubsub_tcpHandler_makeBlocking(handle, fd);
         getsockname(pendingConnectionEntry->fd, (struct sockaddr *) &sin, &len);
         char *interface_url = pubsub_utils_url_get_url(&sin, NULL);
         char *url = pubsub_utils_url_get_url(&their_addr, NULL);
@@ -1143,7 +1173,7 @@ int pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t *handle, psa_tcp_connect
 #else
         struct epoll_event event;
         bzero(&event, sizeof(event)); // zero the struct
-        event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLOUT;
+        event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR;
         event.data.fd = entry->fd;
         // Register Read to epoll
         rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event);
diff --git a/libs/framework/src/celix_log.c b/libs/framework/src/celix_log.c
index 587189b..b2c074e 100644
--- a/libs/framework/src/celix_log.c
+++ b/libs/framework/src/celix_log.c
@@ -26,6 +26,7 @@
 #include "celix_log.h"
 #include "celix_threads.h"
 #include "celix_array_list.h"
+#include "memstream/open_memstream.h"
 
 #define LOG_NAME        "celix_framework"