You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rb...@apache.org on 2020/10/06 19:04:47 UTC

[celix] 03/11: Fix unit test

This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a commit to branch feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v2
in repository https://gitbox.apache.org/repos/asf/celix.git

commit bec22541abb5f012ec8bf3ed2a3f55d8d6c92096
Author: Roy Bulter <ro...@gmail.com>
AuthorDate: Sun Jun 28 20:24:02 2020 +0200

    Fix unit test
---
 .../src/pubsub_psa_tcp_constants.h                 |   2 +-
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.c      | 275 +++++++++++++--------
 2 files changed, 174 insertions(+), 103 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
index 302c9f6..ff8e68f 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
@@ -60,7 +60,7 @@
 
 //Time-out settings are only for BLOCKING connections
 #define PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY       "PUBSUB_TCP_PUBLISHER_SEND_TIMEOUT"
-#define PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT   5.0
+#define PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT   0.0 //5.0
 #define PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT   0.0
 
 #define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY      "PUBSUB_TCP_SUBSCRIBER_RCV_TIMEOUT"
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 19cb0fd..b404ec4 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -53,11 +53,12 @@
 #define MAX_EVENTS   64
 #define MAX_DEFAULT_BUFFER_SIZE 4u
 
-#define READ_STATE_INIT   0u
-#define READ_STATE_HEADER 1u
-#define READ_STATE_DATA   2u
-#define READ_STATE_READY  3u
-#define READ_STATE_FIND_HEADER 4u
+#define READ_STATE_HEADER       0u
+#define READ_STATE_PAYLOAD      1u
+#define READ_STATE_META         2u
+#define READ_STATE_FOOTER       3u
+#define READ_STATE_READY        4u
+#define READ_STATE_SYNC         5u
 
 #if defined(__APPLE__)
 #define MSG_NOSIGNAL (0)
@@ -94,7 +95,9 @@ typedef struct psa_tcp_connection_entry {
     void *footerBuffer;
     unsigned int bufferSize;
     void *buffer;
-    unsigned int bufferReadSize;
+    unsigned int bufferReadReadOffset;
+    unsigned int expectedBufferReadSize;
+    unsigned int msgSizeReadSize;
     unsigned int metaBufferSize;
     void *metaBuffer;
     struct msghdr msg;
@@ -134,29 +137,18 @@ struct pubsub_tcpHandler {
     bool running;
 };
 
-static inline int
-pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock);
-
+static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock);
 static inline int pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry);
-
 static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int fd);
-
-static inline psa_tcp_connection_entry_t *
-pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, char *external_url,
-                              struct sockaddr_in *addr);
-
+static inline psa_tcp_connection_entry_t* pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, char *external_url, struct sockaddr_in *addr);
 static inline void pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry);
-
 static inline void pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *handle, int fd, unsigned int index);
-
-static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* buffer, unsigned int offset, unsigned int size, int flag );
-
+static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* buffer, int flag );
+static inline void pubsub_tcpHandler_setReadStateMachine(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry);
+static inline void pubsub_tcpHandler_setNextStateReadStateMachine(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int nextState);
 static inline void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry);
-
 static inline void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd);
-
 static inline void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle);
-
 static void *pubsub_tcpHandler_thread(void *data);
 
 //
@@ -357,6 +349,7 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
         entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->buffer;
         entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->bufferSize;
         entry->msg_iovlen++;
+        pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER);
     }
     return entry;
 }
@@ -469,6 +462,13 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
                 L_ERROR("[TCP Socket] Cannot create poll event %s\n", strerror(errno));
                 entry = NULL;
             }
+            //rc = pubsub_tcpHandler_makeNonBlocking(handle, entry->fd);
+            //if (rc < 0) {
+            //    pubsub_tcpHandler_freeEntry(entry);
+            //    L_ERROR("[TCP Socket] Cannot make not blocking %s\n", strerror(errno));
+            //    entry = NULL;
+           // }
+
         }
         if ((rc >= 0) && (entry)) {
             celixThreadRwlock_writeLock(&handle->dbLock);
@@ -576,7 +576,7 @@ static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle,
     else {
         rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
         if (rc < 0) {
-            L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING epoll: %s\n", strerror(errno));
+            L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING: %s\n", strerror(errno));
         }
     }
     return rc;
@@ -791,21 +791,20 @@ void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double tim
 }
 
 static inline
-int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* _buffer, unsigned int offset, unsigned int size, int flag ) {
-    int expectedReadSize = size;
-    int nbytes = size;
-    int msgSize = 0;
+int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* _buffer, int flag ) {
+    int nbytes = entry->expectedBufferReadSize;
     char* buffer = (char*)_buffer;
-    while (nbytes > 0 && expectedReadSize > 0) {
+    while (nbytes > 0 && entry->expectedBufferReadSize > 0) {
         // Read the message header
-        nbytes = recv(fd, &buffer[offset], expectedReadSize, flag | MSG_NOSIGNAL);
+        nbytes = recv(fd, &buffer[entry->bufferReadReadOffset], entry->expectedBufferReadSize, flag | MSG_NOSIGNAL);
         // Update buffer administration
-        offset += nbytes;
-        expectedReadSize -= nbytes;
-        msgSize += nbytes;
+        entry->bufferReadReadOffset += nbytes;
+        entry->expectedBufferReadSize-= nbytes;
     }
-    if (nbytes <=0)  msgSize = nbytes;
-    return msgSize;
+    if (nbytes == 0) {
+        L_WARN("[TCP Socket] Failed to receive message (fd: %d), error: %s. Retry count %u of %u,", entry->fd, strerror(errno));
+    }
+    return nbytes;
 }
 
 
@@ -828,6 +827,88 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec
   }
 }
 
+  static inline
+  void pubsub_tcpHandler_setReadStateMachine(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry) {
+      entry->bufferReadReadOffset = 0;
+      if (entry->state == READ_STATE_SYNC) {
+        entry->expectedBufferReadSize = entry->headerSize;
+        entry->state = READ_STATE_HEADER;
+      } else if (entry->state == READ_STATE_HEADER) {
+        if (entry->header.header.payloadSize) {
+            entry->state = READ_STATE_PAYLOAD;
+            entry->bufferReadReadOffset = entry->header.header.payloadOffset;
+            entry->expectedBufferReadSize = entry->header.header.payloadSize;
+            // For header less messages adjust offset and msg size;
+            if (!entry->headerBufferSize) {
+                entry->bufferReadReadOffset += entry->headerSize;
+                entry->expectedBufferReadSize -= entry->headerSize;
+            }
+        } else if (entry->header.header.metadataSize) {
+            entry->state = READ_STATE_META;
+            entry->expectedBufferReadSize = entry->header.header.metadataSize;
+        } else if (!entry->header.header.payloadSize && !entry->header.header.metadataSize) {
+            if (entry->footerSize) {
+                entry->state = READ_STATE_FOOTER;
+                entry->expectedBufferReadSize = entry->footerSize;
+            } else if (entry->header.header.isLastSegment) {
+                entry->state = READ_STATE_READY;
+                entry->expectedBufferReadSize = 0;
+            } else {
+                entry->state = READ_STATE_HEADER;
+                entry->expectedBufferReadSize = entry->headerSize;
+            }
+        }
+      } else if (entry->state == READ_STATE_PAYLOAD) {
+        if (entry->header.header.metadataSize) {
+            entry->state = READ_STATE_META;
+            entry->expectedBufferReadSize = entry->header.header.metadataSize;
+        } else  {
+            if (entry->footerSize) {
+                entry->state = READ_STATE_FOOTER;
+                entry->expectedBufferReadSize = entry->footerSize;
+            } else if (entry->header.header.isLastSegment) {
+                entry->state = READ_STATE_READY;
+                entry->expectedBufferReadSize = 0;
+            } else {
+                entry->state = READ_STATE_HEADER;
+                entry->expectedBufferReadSize = entry->headerSize;
+            }
+        }
+      } else if (entry->state == READ_STATE_META) {
+        if (entry->footerSize) {
+            entry->state = READ_STATE_FOOTER;
+            entry->expectedBufferReadSize = entry->footerSize;
+        } else if (entry->header.header.isLastSegment) {
+            entry->state = READ_STATE_READY;
+            entry->expectedBufferReadSize = 0;
+        } else {
+            entry->state = READ_STATE_HEADER;
+            entry->expectedBufferReadSize = entry->headerSize;
+        }
+      } else if (entry->state == READ_STATE_FOOTER) {
+        if (entry->header.header.isLastSegment) {
+            entry->state = READ_STATE_READY;
+        } else {
+            entry->state = READ_STATE_HEADER;
+            entry->expectedBufferReadSize = entry->headerSize;
+        }
+      } else if (entry->state == READ_STATE_READY) {
+          entry->state = READ_STATE_HEADER;
+          entry->expectedBufferReadSize = entry->headerSize;
+      }
+}
+static inline void pubsub_tcpHandler_setNextStateReadStateMachine(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int nextState){
+    entry->bufferReadReadOffset = 0;
+    if (nextState == READ_STATE_SYNC) {
+        entry->expectedBufferReadSize = entry->syncSize;
+        entry->state = nextState;
+    } else if (nextState == READ_STATE_HEADER) {
+        entry->expectedBufferReadSize = entry->headerSize;
+        entry->state = nextState;
+    }
+}
+
+
 
 //
 // Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure
@@ -858,35 +939,40 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
         }
     // Read the message
     long int nbytes = 0;
-    bool validMsg = false;
     char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer;
+    if (entry->state == READ_STATE_SYNC) {
+        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0);
+        if (nbytes > 0) {
+            pubsub_tcpHandler_setReadStateMachine(handle, entry);
+        }
+    }
     if (entry->state == READ_STATE_HEADER) {
-      nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, MSG_PEEK);
+      nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, MSG_PEEK);
       if (nbytes >= entry->headerSize) { // Check header message buffer
           if (handle->protocol->decodeHeader(handle->protocol->handle, header_buffer, entry->headerSize, &entry->header) != CELIX_SUCCESS) {
               // Did not receive correct header
               // skip sync word and try to read next header
-              nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize, 0);
               if (!entry->headerError) {
                   L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url);
               }
               entry->headerError = true;
-              entry->bufferReadSize = 0;
+              pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_SYNC);
           } else {
               // Read header message from queue
-              nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, 0);
+              pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER);
+              nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0);
               if ((nbytes > 0) && (nbytes == entry->headerSize)) {
                  entry->headerError = false;
-                 entry->state == READ_STATE_DATA;
-                // For headerless message, add header to bufferReadSize;
-                if (!entry->headerBufferSize) entry->bufferReadSize += nbytes;
+                 entry->msgSizeReadSize = 0;
+                 // For headerless message, add header to bufferReadSize;
+                 if (!entry->headerBufferSize) entry->msgSizeReadSize += nbytes;
+                 pubsub_tcpHandler_setReadStateMachine(handle, entry);
               }
           }
       }
     }
 
-    if (nentry->state == READ_STATE_DATA) {
-
+    if (entry->state == READ_STATE_PAYLOAD) {
         // Alloc message buffers
         if (entry->header.header.payloadSize > entry->bufferSize) {
             handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize);
@@ -896,6 +982,17 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
             entry->bufferSize = handle->bufferSize;
         }
 
+        // Read payload data from queue
+        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, 0);
+        if (nbytes > 0) {
+            if (nbytes >= entry->header.header.payloadPartSize) {
+                entry->msgSizeReadSize += nbytes;
+                pubsub_tcpHandler_setReadStateMachine(handle, entry);
+            }
+        }
+    }
+
+    if (entry->state == READ_STATE_META) {
         if (entry->header.header.metadataSize > entry->metaBufferSize) {
             if (entry->metaBuffer) {
                 free(entry->metaBuffer);
@@ -906,64 +1003,38 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
             }
         }
 
-                if (entry->header.header.payloadSize) {
-                    unsigned int offset = entry->header.header.payloadOffset;
-                    unsigned int size = entry->header.header.payloadPartSize;
-                    // For header less messages adjust offset and msg size;
-                    if (!entry->headerBufferSize) {
-                        offset += entry->headerSize;
-                        size -= offset;
-                    }
-                    // Read payload data from queue
-                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, offset, size, 0);
-                    if (nbytes > 0) {
-                        if (nbytes == size) {
-                            entry->bufferReadSize += nbytes;
-                        } else {
-                            entry->bufferReadSize = 0;
-                            L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
-                        }
-                    }
-                }
-                if (nbytes > 0 && entry->header.header.metadataSize) {
-                    // Read meta data from queue
-                    unsigned int size = entry->header.header.metadataSize;
-                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0, size,0);
-                    if ((nbytes > 0) && (nbytes != size)) {
-                        L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
-                    }
-                }
-                // Check for end of message using, footer of message. Because of streaming protocol
-                if (nbytes > 0) {
-                    if (entry->footerSize > 0) {
-                        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer,0, entry->footerSize,0);
-                        if (handle->protocol->decodeFooter(handle->protocol->handle, entry->footerBuffer, entry->footerSize, &entry->header) == CELIX_SUCCESS) {
-                            // valid footer, this means that the message is valid
-                            validMsg = true;
-                        } else {
-                            // Did not receive correct header
-                            L_ERROR("[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url);
-                            entry->bufferReadSize = 0;
-                        }
-                    } else {
-                        // No Footer, then complete message is received
-                        validMsg = true;
-                    }
-                }
-            }
+        // Read meta data from (queue
+        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0);
+        if ((nbytes > 0) && (nbytes >= entry->header.header.metadataSize)) {
+            entry->msgSizeReadSize += nbytes;
+            pubsub_tcpHandler_setReadStateMachine(handle, entry);
         }
     }
+    if (entry->state == READ_STATE_FOOTER) {
+        // Check for end of message using, footer of message. Because of streaming protocol
+        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer, 0);
+        if (handle->protocol->decodeFooter(handle->protocol->handle, entry->footerBuffer, entry->footerSize, &entry->header) == CELIX_SUCCESS) {
+            // valid footer, this means that the message is valid
+            pubsub_tcpHandler_setReadStateMachine(handle, entry);
+        } else {
+            // Did not receive correct header
+            L_ERROR("[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url);
+            pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER);
+        }
+    }
+    if (entry->state == READ_STATE_READY) {
+        // Complete message is received
+        pubsub_tcpHandler_decodePayload(handle, entry);
+        pubsub_tcpHandler_setReadStateMachine(handle, entry);
+    }
+
     if (nbytes > 0) {
         entry->retryCount = 0;
-        // Check if complete message is received
-        if ((entry->bufferReadSize >= entry->header.header.payloadSize) &&
-             validMsg &&
-             entry->header.header.isLastSegment) {
-            entry->bufferReadSize = 0;
-            pubsub_tcpHandler_decodePayload(handle, entry);
-        }
-    } else {
-        if (entry->retryCount < handle->maxRcvRetryCount) {
+    } else if (nbytes < 0) {
+        if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
+            // Non blocking interrupt
+            entry->retryCount = 0;
+        } else if (entry->retryCount < handle->maxRcvRetryCount) {
             entry->retryCount++;
             L_WARN("[TCP Socket] Failed to receive message (fd: %d), error: %s. Retry count %u of %u,", entry->fd,
                    strerror(errno), entry->retryCount, handle->maxRcvRetryCount);
@@ -1143,7 +1214,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                         msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[msgIovLen + i].iov_len;
                         if ((msgPartSize + msg.msg_iov[msg.msg_iovlen].iov_len) > entry->maxMsgSize)
                             break;
-                        msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                        msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
                     }
                     message->header.payloadPartSize = msgPartSize;
                     message->header.payloadOffset = msgSize;
@@ -1171,7 +1242,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                     msg.msg_iovlen++;
                     msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
                     msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize;
-                    msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                    msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
                 }
 
                 void *headerData = NULL;
@@ -1196,7 +1267,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                     L_ERROR("[TCP Socket] No header buffer is generated");
                     msg.msg_iovlen = 0;
                 }
-                nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgSize, flags);
+                nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgPartSize, flags);
                 //  When a specific socket keeps reporting errors can indicate a subscriber
                 //  which is not active anymore, the connection will remain until the retry
                 //  counter exceeds the maximum retry count.
@@ -1215,7 +1286,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                     result = -1; //At least one connection failed sending
                 } else if (msgPartSize) {
                     entry->retryCount = 0;
-                    if (nbytes != msgSize) {
+                    if (nbytes != msgPartSize) {
                         L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgSize, nbytes, strerror(errno));
                     }
                 }