You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rl...@apache.org on 2019/09/16 20:58:47 UTC
[celix] branch develop updated: Fixed TCP transmissions (#50)
This is an automated email from the ASF dual-hosted git repository.
rlenferink pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/develop by this push:
new ec12aaa Fixed TCP transmissions (#50)
ec12aaa is described below
commit ec12aaa78439b5a1b7f316b6955041e4cbd0ad8e
Author: rbulter <ro...@gmail.com>
AuthorDate: Mon Sep 16 22:58:42 2019 +0200
Fixed TCP transmissions (#50)
* Fix TCP high load traffic
* Don't remove connection
* Remove epoll event EPOLLET
* Remove test
* Refactor read handler function
* Reduce processing Load
---
CMakeLists.txt | 6 +-
.../src/pubsub_psa_tcp_constants.h | 21 +-
.../pubsub_admin_tcp/src/pubsub_tcp_handler.c | 677 +++++++++++++--------
.../pubsub_admin_tcp/src/pubsub_tcp_handler.h | 5 +-
.../pubsub_admin_tcp/src/pubsub_tcp_msg_header.h | 23 +-
.../src/pubsub_tcp_topic_receiver.c | 13 +-
.../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c | 24 +-
7 files changed, 476 insertions(+), 293 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2bac90d..76bf074 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -38,8 +38,10 @@ set(ENABLE_MORE_WARNINGS OFF)
IF (ANDROID)
set(CMAKE_C_FLAGS "-D_GNU_SOURCE -std=gnu99 -Wall ${CMAKE_C_FLAGS}")
ELSE ()
- set(CMAKE_C_FLAGS "-D_GNU_SOURCE -std=gnu99 -Wall -Werror -fPIC ${CMAKE_C_FLAGS}")
- set(CMAKE_CXX_FLAGS "-std=c++11 -Wall -Wextra -Weffc++ -fno-rtti -fno-exceptions ${CMAKE_CXX_FLAGS}")
+ set(CMAKE_C_FLAGS "-D_GNU_SOURCE -std=gnu99 -fPIC ${CMAKE_C_FLAGS}")
+ set(CMAKE_CXX_FLAGS "-std=c++11 -fno-rtti -fno-exceptions ${CMAKE_CXX_FLAGS}")
+ set(CMAKE_C_FLAGS "-Wall -Werror ${CMAKE_C_FLAGS}")
+ set(CMAKE_CXX_FLAGS "-Wall -Wextra -Weffc++ ${CMAKE_CXX_FLAGS}")
set(CMAKE_C_FLAGS_DEBUG "-g -DDEBUG ${CMAKE_C_FLAGS}")
set(CMAKE_CXX_FLAGS_DEBUG "-g -DDEBUG ${CMAKE_CXX_FLAGS}")
ENDIF()
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
index 42ad2fa..a6bfc81 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
@@ -30,9 +30,9 @@
#define PSA_TCP_DEFAULT_BASE_PORT 5501
#define PSA_TCP_DEFAULT_MAX_PORT 6000
-#define PSA_TCP_DEFAULT_MAX_RECV_SESSIONS 16
-#define PSA_TCP_DEFAULT_RECV_BUFFER_SIZE 6500000
-#define PSA_TCP_DEFAULT_TIMEOUT 500
+#define PSA_TCP_DEFAULT_MAX_RECV_SESSIONS 1
+#define PSA_TCP_DEFAULT_RECV_BUFFER_SIZE 1024
+#define PSA_TCP_DEFAULT_TIMEOUT 2000
#define PSA_TCP_DEFAULT_QOS_SAMPLE_SCORE 30
#define PSA_TCP_DEFAULT_QOS_CONTROL_SCORE 70
@@ -93,6 +93,21 @@
#define PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER "server"
#define PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT "client"
+
+/**
+ * The TCP admin supports send message without pubsub header.
+ * In this case a message_id must be part of the message to be able to distinguish
+ * the different messages. The location of the message ID is configured with PUBSUB_TCP_MESSAGE_ID_OFFSET.
+ * The size of the message ID in bytes is specified with PUBSUB_TCP_MESSAGE_ID_SIZE
+ * The properties can be set in the topic properties.
+ */
+#define PUBSUB_TCP_BYPASS_HEADER "tcp.static.bypass.header"
+#define PUBSUB_TCP_DEFAULT_BYPASS_HEADER false
+#define PUBSUB_TCP_MESSAGE_ID_OFFSET "tcp.static.message_id.offset"
+#define PUBSUB_TCP_DEFAULT_MESSAGE_ID_OFFSET 0
+#define PUBSUB_TCP_MESSAGE_ID_SIZE "tcp.static.message_id.size"
+#define PUBSUB_TCP_DEFAULT_MESSAGE_ID_SIZE 4
+
/**
* Realtime thread prio and scheduling information. This is used to setup the thread prio/sched of the
* internal TCP threads.
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index ef9ad60..42a7061 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -46,7 +46,14 @@
#define IP_HEADER_SIZE 20
#define TCP_HEADER_SIZE 20
#define MAX_EPOLL_EVENTS 64
-#define MAX_MSG_VECTOR_LEN 256
+#define MAX_MSG_VECTOR_LEN 64
+#define MAX_DEFAULT_BUFFER_SIZE 4u
+
+#define READ_STATE_INIT 0u
+#define READ_STATE_HEADER 1u
+#define READ_STATE_DATA 2u
+#define READ_STATE_READY 3u
+#define READ_STATE_FIND_HEADER 4u
#define L_DEBUG(...) \
logHelper_log(handle->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
@@ -57,47 +64,52 @@
#define L_ERROR(...) \
logHelper_log(handle->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
+typedef struct psa_tcp_connection_entry {
+ char *url;
+ int fd;
+ struct sockaddr_in addr;
+ socklen_t len;
+ bool connected;
+ unsigned int bufferSize;
+ char *buffer;
+ int bufferReadSize;
+ int expectedReadSize;
+ int readState;
+ pubsub_tcp_msg_header_t header;
+
+} psa_tcp_connection_entry_t;
+
struct pubsub_tcpHandler {
- array_list_pt bufferLists;
- unsigned int bufferIdx;
unsigned int readSeqNr;
- unsigned int writeSeqNr;
unsigned int msgIdOffset;
+ unsigned int msgIdSize;
bool bypassHeader;
+ bool useBlocking;
celix_thread_rwlock_t dbLock;
unsigned int timeout;
hash_map_t *url_map;
hash_map_t *fd_map;
int efd;
- int fd;
- char *url;
pubsub_tcpHandler_connectMessage_callback_t connectMessageCallback;
pubsub_tcpHandler_connectMessage_callback_t disconnectMessageCallback;
void *connectPayload;
pubsub_tcpHandler_processMessage_callback_t processMessageCallback;
void *processMessagePayload;
log_helper_t *logHelper;
- unsigned int default_bufferSize;
- unsigned int default_buffer;
-};
-
-typedef struct pubsub_tcpBufferPartList {
- pubsub_tcp_msg_header_t default_header;
unsigned int bufferSize;
- char *buffer;
-} pubsub_tcpBufferPartList_t;
-
+ unsigned int maxNofBuffer;
+ psa_tcp_connection_entry_t own;
+};
-typedef struct psa_tcp_connection_entry {
- char *url;
- int fd;
- struct sockaddr_in addr;
- socklen_t len;
-} psa_tcp_connection_entry_t;
static inline int pubsub_tcpHandler_setInAddr(pubsub_tcpHandler_t *handle, const char *hostname, int port, struct sockaddr_in *inp);
static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock);
static inline int pubsub_tcpHandler_closeConnection(pubsub_tcpHandler_t *handle, int fd);
+static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int fd);
+//static inline int pubsub_tcpHandler_makeBlocking(pubsub_tcpHandler_t *handle, int fd);
+static inline void pubsub_tcpHandler_setupEntry(psa_tcp_connection_entry_t* entry, int fd, char *url, unsigned int bufferSize);
+static inline void pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t* entry);
//
@@ -106,15 +118,17 @@ static inline int pubsub_tcpHandler_closeConnection(pubsub_tcpHandler_t *handle,
pubsub_tcpHandler_t *pubsub_tcpHandler_create(log_helper_t *logHelper) {
pubsub_tcpHandler_t *handle = calloc(sizeof(*handle), 1);
if (handle != NULL) {
- handle->fd = -1;
handle->efd = epoll_create1(0);
- handle->bufferIdx = 0;
handle->url_map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
handle->fd_map = hashMap_create(NULL, NULL, NULL, NULL);
- handle->timeout = 500;
+ handle->timeout = 2000; // default 2 sec
handle->logHelper = logHelper;
- handle->default_bufferSize =sizeof(handle->default_buffer);
- handle->default_buffer = 0;
+ handle->msgIdOffset = 0;
+ handle->msgIdSize = 4;
+ handle->bypassHeader = false;
+ handle->bufferSize = MAX_DEFAULT_BUFFER_SIZE;
+ handle->maxNofBuffer = 1; // Reserved for future Use;
+ pubsub_tcpHandler_setupEntry(&handle->own, -1, NULL, MAX_DEFAULT_BUFFER_SIZE);
celixThreadRwlock_create(&handle->dbLock, 0);
//signal(SIGPIPE, SIG_IGN);
}
@@ -139,26 +153,9 @@ void pubsub_tcpHandler_destroy(pubsub_tcpHandler_t *handle) {
}
if (handle->efd >= 0) close(handle->efd);
- if (handle->url) free(handle->url);
+ pubsub_tcpHandler_freeEntry(&handle->own);
hashMap_destroy(handle->url_map, false, false);
hashMap_destroy(handle->fd_map, false, false);
-
- if (handle->bufferLists != NULL) {
- int listSize = arrayList_size(handle->bufferLists);
- int i;
- for (i = 0; i < listSize; i++) {
- pubsub_tcpBufferPartList_t *item = arrayList_get(handle->bufferLists, i);
- if (item) {
- if (item->buffer) {
- free(item->buffer);
- item->buffer = NULL;
- }
- free(item);
- }
- }
- arrayList_destroy(handle->bufferLists);
- }
- handle->bufferLists = NULL;
celixThreadRwlock_unlock(&handle->dbLock);
celixThreadRwlock_destroy(&handle->dbLock);
free(handle);
@@ -213,83 +210,119 @@ int pubsub_tcpHandler_close(pubsub_tcpHandler_t *handle) {
int rc = 0;
if (handle != NULL) {
celixThreadRwlock_writeLock(&handle->dbLock);
- if ((handle->efd >= 0) && (handle->fd >= 0)) {
+ if ((handle->efd >= 0) && (handle->own.fd >= 0)) {
struct epoll_event event;
bzero(&event, sizeof(struct epoll_event)); // zero the struct
- rc = epoll_ctl(handle->efd, EPOLL_CTL_DEL, handle->fd, &event);
+ rc = epoll_ctl(handle->efd, EPOLL_CTL_DEL, handle->own.fd, &event);
if (rc < 0) {
L_ERROR("[PSA TCP] Error disconnecting %s\n", strerror(errno));
}
}
- if (handle->url) {
- free(handle->url);
- handle->url = NULL;
- }
- if (handle->fd >= 0) {
- close(handle->fd);
- handle->fd = -1;
- }
+ pubsub_tcpHandler_freeEntry(&handle->own);
celixThreadRwlock_unlock(&handle->dbLock);
}
return rc;
}
+static inline
+void pubsub_tcpHandler_setupEntry(psa_tcp_connection_entry_t* entry, int fd, char *url, unsigned int bufferSize) {
+ entry->fd = fd;
+ if (url) entry->url = strndup(url, 1024 * 1024);
+ if ((bufferSize > entry->bufferSize)&&(bufferSize)) {
+ entry->bufferSize = bufferSize;
+ if (entry->buffer) free(entry->buffer);
+ entry->buffer = calloc(sizeof(char), entry->bufferSize);
+ }
+ entry->connected = true;
+}
+
+static inline
+void pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t* entry) {
+ if (entry->url) {
+ free(entry->url);
+ entry->url = NULL;
+ }
+ if (entry->fd >= 0) {
+ close(entry->fd);
+ entry->fd = -1;
+ }
+ if (entry->buffer) {
+ free(entry->buffer);
+ entry->buffer = NULL;
+ entry->bufferSize = 0;
+ }
+ entry->connected = false;
+}
+
int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
- pubsub_tcpHandler_url_t url_info;
- pubsub_tcpHandler_setUrlInfo(url, &url_info);
- psa_tcp_connection_entry_t *entry = NULL;
- int fd = pubsub_tcpHandler_open(handle, NULL);
- celixThreadRwlock_writeLock(&handle->dbLock);
- int rc = fd;
- struct sockaddr_in addr; // connector's address information
- if (rc >= 0) {
- rc = pubsub_tcpHandler_setInAddr(handle, url_info.hostname, url_info.portnr, &addr);
- if (rc < 0) {
- L_ERROR("[TCP Socket] Cannot create url\n");
- close(fd);
+ int rc = 0;
+ psa_tcp_connection_entry_t *entry = hashMap_get(handle->url_map, (void *) (intptr_t) url);
+ if (entry == NULL) {
+ pubsub_tcpHandler_url_t url_info;
+ pubsub_tcpHandler_setUrlInfo(url, &url_info);
+ int fd = pubsub_tcpHandler_open(handle, NULL);
+ rc = fd;
+ struct sockaddr_in addr; // connector's address information
+ if (rc >= 0) {
+ rc = pubsub_tcpHandler_setInAddr(handle, url_info.hostname, url_info.portnr, &addr);
+ if (rc < 0) {
+ L_ERROR("[TCP Socket] Cannot create url\n");
+ close(fd);
+ }
}
- }
- if (rc >= 0) {
- rc = connect(fd, (struct sockaddr *) &addr, sizeof(struct sockaddr));
- if (rc < 0) {
- //L_ERROR("[TCP Socket] Cannot connect to %s:%d\n", url_info.hostname, url_info.portnr);
- close(fd);
- fd = -1;
- } else {
- struct sockaddr_in sin;
- socklen_t len = sizeof(sin);
- entry = calloc(1, sizeof(*entry));
- entry->url = strndup(url, 1024 * 1024);
- entry->fd = fd;
- if (getsockname(fd, (struct sockaddr *) &sin, &len) < 0) {
- L_ERROR("[TCP Socket] getsockname %s\n", strerror(errno));
- } else if (handle->url == NULL) {
- char *address = inet_ntoa(sin.sin_addr);
- unsigned int port = ntohs(sin.sin_port);
- asprintf(&handle->url, "tcp://%s:%u", address, port);
+ // Make file descriptor NonBlocking
+ if ((!handle->useBlocking) && (rc >= 0)) {
+ rc = pubsub_tcpHandler_makeNonBlocking(handle, fd);
+ if (rc < 0) close(fd);
+ }
+ // Connect to sender
+ if (rc >= 0) {
+ rc = connect(fd, (struct sockaddr *) &addr, sizeof(struct sockaddr));
+ if (rc < 0 && errno != EINPROGRESS) {
+ L_ERROR("[TCP Socket] Cannot connect to %s:%d: err: %s\n", url_info.hostname, url_info.portnr,
+ strerror(errno));
+ close(fd);
+ errno = 0;
+ } else {
+ struct sockaddr_in sin;
+ socklen_t len = sizeof(sin);
+ entry = calloc(1, sizeof(*entry));
+ pubsub_tcpHandler_setupEntry(entry, fd, url, handle->bufferSize);
+ entry->connected = false; // Wait till epoll event, to report connected.
+ rc = getsockname(fd, (struct sockaddr *) &sin, &len);
+ if (rc < 0) {
+ L_ERROR("[TCP Socket] getsockname %s\n", strerror(errno));
+ errno = 0;
+ } else if (handle->own.url == NULL) {
+ char *address = inet_ntoa(sin.sin_addr);
+ unsigned int port = ntohs(sin.sin_port);
+ asprintf(&handle->own.url, "tcp://%s:%u", address, port);
+ }
}
}
- }
- if (rc >= 0) {
- struct epoll_event event;
- bzero(&event, sizeof(struct epoll_event)); // zero the struct
- event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLET | EPOLLOUT;
- event.data.fd = entry->fd;
- rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event);
- if (rc < 0) {
- close(entry->fd);
- free(entry->url);
- free(entry);
- L_ERROR("[TCP Socket] Cannot create epoll\n");
+ // Subscribe File Descriptor to epoll
+ if ((rc >= 0) && (entry)) {
+ struct epoll_event event;
+ bzero(&event, sizeof(struct epoll_event)); // zero the struct
+ event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLOUT;
+ event.data.fd = entry->fd;
+ rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event);
+ if (rc < 0) {
+ pubsub_tcpHandler_freeEntry(entry);
+ free(entry);
+ L_ERROR("[TCP Socket] Cannot create epoll %s\n", strerror(errno));
+ errno = 0;
+ entry = NULL;
+ }
}
+ if ((rc >= 0) && (entry)) {
+ celixThreadRwlock_writeLock(&handle->dbLock);
+ hashMap_put(handle->url_map, entry->url, entry);
+ hashMap_put(handle->fd_map, (void *) (intptr_t) entry->fd, entry);
+ celixThreadRwlock_unlock(&handle->dbLock);
+ }
+ pubsub_tcpHandler_free_setUrlInfo(&url_info);
}
- if ((rc >= 0) && (entry)) {
- if (handle->connectMessageCallback) handle->connectMessageCallback(handle->connectPayload, entry->url, false);
- hashMap_put(handle->url_map, entry->url, entry);
- hashMap_put(handle->fd_map, (void *) (intptr_t) entry->fd, entry);
- }
- pubsub_tcpHandler_free_setUrlInfo(&url_info);
- celixThreadRwlock_unlock(&handle->dbLock);
return rc;
}
@@ -320,14 +353,13 @@ static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *ha
rc = epoll_ctl(handle->efd, EPOLL_CTL_DEL, entry->fd, &event);
if (rc < 0) {
L_ERROR("[PSA TCP] Error disconnecting %s\n", strerror(errno));
+ errno = 0;
}
}
if (entry->fd >= 0) {
if (handle->disconnectMessageCallback)
handle->disconnectMessageCallback(handle->connectPayload, entry->url, lock);
- close(entry->fd);
- free(entry->url);
- entry->url = NULL;
+ pubsub_tcpHandler_freeEntry(entry);
free(entry);
}
}
@@ -343,7 +375,7 @@ static inline int pubsub_tcpHandler_closeConnection(pubsub_tcpHandler_t *handle,
bool use_handle_fd = false;
psa_tcp_connection_entry_t *entry = NULL;
celixThreadRwlock_readLock(&handle->dbLock);
- if (fd != handle->fd) {
+ if (fd != handle->own.fd) {
entry = hashMap_get(handle->fd_map, (void *) (intptr_t) fd);
} else {
use_handle_fd = true;
@@ -361,45 +393,68 @@ static inline int pubsub_tcpHandler_closeConnection(pubsub_tcpHandler_t *handle,
return rc;
}
+static inline
+int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int fd) {
+ int rc = 0;
+ int flags = fcntl(fd, F_GETFL, 0);
+ if (flags == -1) rc = flags;
+ else {
+ rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+ if (rc < 0) {
+ L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING epoll: %s\n", strerror(errno));
+ errno = 0;
+ }
+ }
+ return rc;
+}
+
+#ifdef USE_BLOCKING
+static inline
+int pubsub_tcpHandler_makeBlocking(pubsub_tcpHandler_t *handle, int fd) {
+ int rc = 0;
+ int flags = fcntl(fd, F_GETFL, 0);
+ if (flags == -1) rc = flags;
+ else {
+ rc = fcntl(fd, F_SETFL, flags & (~O_NONBLOCK));
+ if (rc < 0) {
+ L_ERROR("[TCP Socket] Cannot set to BLOCKING epoll: %s\n", strerror(errno));
+ errno = 0;
+ }
+ }
+ return rc;
+}
+#endif
+
int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url) {
- handle->fd = pubsub_tcpHandler_open(handle, url);
- handle->url = strndup(url, 1024 * 1024);
- int rc = handle->fd;
+ int fd = pubsub_tcpHandler_open(handle, url);
+ // Make handler fd entry
+ pubsub_tcpHandler_setupEntry(&handle->own, fd, url, MAX_DEFAULT_BUFFER_SIZE);
+ int rc = fd;
celixThreadRwlock_writeLock(&handle->dbLock);
if (rc >= 0) {
- rc = listen(handle->fd, SOMAXCONN);
+ rc = listen(fd, SOMAXCONN);
if (rc != 0) {
L_ERROR("[TCP Socket] Error listen: %s\n", strerror(errno));
- close(handle->fd);
- handle->fd = -1;
- free(handle->url);
- handle->url = NULL;
+ pubsub_tcpHandler_freeEntry(&handle->own);
+ errno = 0;
}
}
if (rc >= 0) {
- int flags = fcntl(handle->fd, F_GETFL, 0);
- if (flags == -1) {
- rc = flags;
- } else {
- rc = fcntl(handle->fd, F_SETFL, flags | O_NONBLOCK);
- if (rc < 0) {
- L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING epoll\n");
- close(handle->fd);
- handle->fd = -1;
- free(handle->url);
- handle->url = NULL;
- }
+ rc = pubsub_tcpHandler_makeNonBlocking(handle, fd);
+ if (rc < 0) {
+ pubsub_tcpHandler_freeEntry(&handle->own);
}
}
if ((rc >= 0) && (handle->efd >= 0)) {
struct epoll_event event;
bzero(&event, sizeof(event)); // zero the struct
- event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLET | EPOLLOUT;
- event.data.fd = handle->fd;
- rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, handle->fd, &event);
+ event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR;
+ event.data.fd = fd;
+ rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, fd, &event);
if (rc < 0) {
- L_ERROR("[TCP Socket] Cannot create epoll\n");
+ L_ERROR("[TCP Socket] Cannot create epoll: %s\n",strerror(errno));
+ errno = 0;
}
}
celixThreadRwlock_unlock(&handle->dbLock);
@@ -416,7 +471,8 @@ int pubsub_tcpHandler_setInAddr(pubsub_tcpHandler_t *handle, const char *hostnam
if (!inet_aton(hostname, &inp->sin_addr)) {
hp = gethostbyname(hostname);
if (hp == NULL) {
- L_ERROR("[TCP Socket] set_in_addr: Unknown host name %s\n", hostname);
+ L_ERROR("[TCP Socket] set_in_addr: Unknown host name %s, %s\n", hostname, strerror(errno));
+ errno = 0;
return -1;
}
inp->sin_addr = *(struct in_addr *) hp->h_addr;
@@ -451,7 +507,7 @@ void pubsub_tcpHandler_setUrlInfo(char *url, pubsub_tcpHandler_url_t *url_info)
url_info->hostname = strtok(strdup(hostname), ":");
if (port) {
port += 1;
- unsigned int portDigits = (unsigned int) atoi(port);
+ unsigned int portDigits = (unsigned) atoi(port);
if (portDigits != 0) url_info->portnr = portDigits;
}
free(hostname);
@@ -469,20 +525,13 @@ void pubsub_tcpHandler_free_setUrlInfo(pubsub_tcpHandler_url_t *url_info) {
}
-int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle, unsigned int maxNofBuffers,
+int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle,
+ unsigned int maxNofBuffers __attribute__ ((__unused__)),
unsigned int bufferSize) {
if (handle != NULL) {
- int i = 0;
celixThreadRwlock_writeLock(&handle->dbLock);
- if (arrayList_create(&handle->bufferLists) != CELIX_SUCCESS) {
- return -1;
- }
- for (i = 0; i < maxNofBuffers; i++) {
- pubsub_tcpBufferPartList_t *item = calloc(1, sizeof(struct pubsub_tcpBufferPartList));
- item->buffer = calloc(sizeof(char), bufferSize);
- item->bufferSize = bufferSize;
- arrayList_add(handle->bufferLists, item);
- }
+ handle->bufferSize = bufferSize;
+ handle->maxNofBuffer = maxNofBuffers;
celixThreadRwlock_unlock(&handle->dbLock);
}
return 0;
@@ -496,6 +545,15 @@ void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle, unsigned int time
}
}
+void pubsub_tcpHandler_setBypassHeader(pubsub_tcpHandler_t *handle, bool bypassHeader, unsigned int msgIdOffset, unsigned int msgIdSize) {
+ if (handle != NULL) {
+ celixThreadRwlock_writeLock(&handle->dbLock);
+ handle->bypassHeader = bypassHeader;
+ handle->msgIdOffset = msgIdOffset;
+ handle->msgIdSize = msgIdSize;
+ celixThreadRwlock_unlock(&handle->dbLock);
+ }
+}
//
// Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure
@@ -503,43 +561,103 @@ void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle, unsigned int time
//
int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigned int *index, unsigned int *size) {
celixThreadRwlock_writeLock(&handle->dbLock);
- if (handle->bufferLists == NULL) {
- int nbytes = recv(fd, &handle->default_buffer, handle->default_bufferSize, MSG_PEEK);
+ psa_tcp_connection_entry_t *entry = NULL;
+ if (fd == handle->own.fd) entry = &handle->own;
+ else entry = hashMap_get(handle->fd_map, (void *) (intptr_t) fd);
+ // Find FD entry
+ if (entry == NULL) {
celixThreadRwlock_unlock(&handle->dbLock);
- return nbytes;
+ return -1;
}
- int listSize = arrayList_size(handle->bufferLists);
- pubsub_tcpBufferPartList_t *item = arrayList_get(handle->bufferLists, handle->bufferIdx);
- if (!handle->bypassHeader) {
- // Only read the header, we don't know yet where to store the payload
- int nbytes = recv(fd, item->buffer, sizeof(pubsub_tcp_msg_header_t) + sizeof(unsigned int), MSG_PEEK);
- if (nbytes < 0) {
- L_ERROR("[TCP Socket] read error \n");
- celixThreadRwlock_unlock(&handle->dbLock);
- return nbytes;
- }
- unsigned int *pBuffer_size = ((unsigned int *) &item->buffer[sizeof(pubsub_tcp_msg_header_t)]);
- unsigned int buffer_size = *pBuffer_size + sizeof(pubsub_tcp_msg_header_t) + sizeof(unsigned int);
- if (buffer_size > item->bufferSize) {
- free(item->buffer);
- item->buffer = calloc(buffer_size, sizeof(char));
- item->bufferSize = buffer_size;
+ // If it's not connected return from function
+ if (!entry->connected) {
+ celixThreadRwlock_unlock(&handle->dbLock);
+ return -1;
+ }
+
+ // In init state
+ if (!entry->readState) {
+ entry->bufferReadSize = 0;
+ if (!handle->bypassHeader) {
+ // First start looking for header
+ entry->readState = READ_STATE_HEADER;
+ entry->expectedReadSize = sizeof(pubsub_tcp_msg_header_t);
+ } else {
+ // When no header use Max buffer size
+ entry->readState = READ_STATE_READY;
+ entry->expectedReadSize = entry->bufferSize;
}
}
- int nbytes = recv(fd, item->buffer, item->bufferSize, 0);
+ // Read the message
+ int nbytes = recv(fd, &entry->buffer[entry->bufferReadSize], entry->expectedReadSize, 0);
if (nbytes < 0) {
- L_ERROR("[TCP Socket] read error \n");
- celixThreadRwlock_unlock(&handle->dbLock);
- return nbytes;
+ // Handle Socket error, when nbytes == 0 => Connection is lost
+ if (nbytes < 0) {
+ L_ERROR("[TCP Socket] read error %s\n", strerror(errno));
+ errno = 0;
+ }
}
- if (!handle->bypassHeader) {
- nbytes -= sizeof(pubsub_tcp_msg_header_t) + sizeof(unsigned int);
+ if ((!handle->bypassHeader)&&(nbytes>0)) {
+ // Update buffer administration
+ entry->bufferReadSize += nbytes;
+ entry->expectedReadSize -= nbytes;
+ // When expected data is read then update state
+ if (entry->expectedReadSize <= 0) {
+ pubsub_tcp_msg_header_t *pHeader = (pubsub_tcp_msg_header_t *) entry->buffer;
+ if (entry->readState == READ_STATE_FIND_HEADER) {
+ // When header marker is not found, start finding header
+ if (pHeader->marker_start == MARKER_START_PATTERN) {
+ // header marker is found, then read the remaining data of the header and update to HEADER State
+ entry->expectedReadSize = sizeof(pubsub_tcp_msg_header_t) - sizeof(unsigned int);
+ entry->readState = READ_STATE_HEADER;
+ } else {
+ // keep looking for the header marker
+ entry->bufferReadSize = 0;
+ entry->expectedReadSize = sizeof(unsigned int);
+ }
+ // Check if the header contains the correct markers
+ } else if ((pHeader->marker_start != MARKER_START_PATTERN) || (pHeader->marker_end != MARKER_END_PATTERN)) {
+ // When markers are not correct, find a new marker and update state to FIND Header
+ L_ERROR(
+ "[TCP Socket] Read Header: Marker (%d) start: 0x%08X != 0x%08X stop: 0x%08X != 0x%08X errno: %s",
+ handle->readSeqNr, pHeader->marker_start, MARKER_START_PATTERN, pHeader->marker_end,
+ MARKER_END_PATTERN, strerror(errno));
+ entry->bufferReadSize = 0;
+ entry->expectedReadSize = sizeof(unsigned int);
+ entry->readState = READ_STATE_FIND_HEADER;
+ } else if (entry->readState == READ_STATE_HEADER) {
+ // Header is found, read the data from the socket, update state to READ_STATE_DATA
+ int buffer_size = pHeader->bufferSize + entry->bufferReadSize;
+ // When buffer is not big enough, reallocate buffer
+ if (buffer_size > entry->bufferSize) {
+ entry->buffer = realloc(entry->buffer, buffer_size);
+ entry->bufferSize = buffer_size;
+ //L_WARN("[TCP Socket: %d, url: %s, realloc read buffer: (%d, %d) \n", entry->fd, entry->url, entry->bufferSize, buffer_size);
+ }
+ // Set data read size
+ entry->expectedReadSize = pHeader->bufferSize;
+ entry->readState++;
+ // The data is read, update administation and set state to READ_STATE_READY
+ } else if (entry->readState == READ_STATE_DATA) {
+ handle->readSeqNr = pHeader->seqNr;
+ //fprintf(stdout, "ReadSeqNr: Count: %d\n", handle->readSeqNr);
+ nbytes -= sizeof(pubsub_tcp_msg_header_t);
+ if (nbytes == 0) {
+ errno = 0;
+ }
+ // if buffer does not contain header, reset buffer
+ if (nbytes < 0) {
+ L_ERROR("[TCP Socket] incomplete message\n");
+ entry->readState = READ_STATE_INIT;
+ } else {
+ entry->readState++;
+ }
+ }
+ }
}
-
- *index = handle->bufferIdx;
+ *index = 0;
+ // if read state is not ready, don't process buffer
*size = nbytes;
- handle->bufferIdx++;
- handle->bufferIdx = handle->bufferIdx % listSize;
celixThreadRwlock_unlock(&handle->dbLock);
return nbytes;
}
@@ -547,25 +665,31 @@ int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigne
//
// Read out the message which is indicated available by the largeUdp_dataAvailable function
//
-int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, unsigned int index, pubsub_tcp_msg_header_t **header,
- void **buffer, unsigned int size) {
+int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd, unsigned int index __attribute__ ((__unused__)),
+ pubsub_tcp_msg_header_t **header, void **buffer, unsigned int size) {
int result = 0;
celixThreadRwlock_readLock(&handle->dbLock);
- pubsub_tcpBufferPartList_t *item = arrayList_get(handle->bufferLists, index);
- if (item) {
+ psa_tcp_connection_entry_t *entry = hashMap_get(handle->fd_map, (void *) (intptr_t)fd);
+ if (entry == NULL) result = -1;
+ if (entry) {
+ result = (!entry->connected) ? -1 : result;
+ result = (entry->readState != READ_STATE_READY) ? -1 : result;
+ }
+ if (!result) {
if (handle->bypassHeader) {
- *header = &item->default_header;
- *buffer = item->buffer;
- item->default_header.type = (unsigned int) item->buffer[handle->msgIdOffset];
- item->default_header.seqNr = handle->readSeqNr++;
- item->default_header.sendTimeNanoseconds = 0;
- item->default_header.sendTimeNanoseconds = 0;
+ *header = &entry->header;
+ *buffer = entry->buffer;
+ entry->header.type = (unsigned int) entry->buffer[handle->msgIdOffset];
+ entry->header.seqNr = handle->readSeqNr++;
+ entry->header.sendTimeNanoseconds = 0;
+ entry->header.sendTimeNanoseconds = 0;
+ entry->readState = READ_STATE_INIT;
} else {
- *header = (pubsub_tcp_msg_header_t *) item->buffer;
- *buffer = &item->buffer[sizeof(pubsub_tcp_msg_header_t) + sizeof(unsigned int)];
+ *header = (pubsub_tcp_msg_header_t *) entry->buffer;
+ *buffer = entry->buffer + sizeof(pubsub_tcp_msg_header_t);
+ entry->readState = READ_STATE_INIT;
}
- } else {
- result = -1;
+ entry->readState = READ_STATE_INIT;
}
celixThreadRwlock_unlock(&handle->dbLock);
return result;
@@ -602,7 +726,9 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_tcp_msg_header_t
celixThreadRwlock_readLock(&handle->dbLock);
int result = 0;
int written = 0;
-
+ header->marker_start = MARKER_START_PATTERN;
+ header->marker_end = MARKER_END_PATTERN;
+ header->bufferSize = size;
hash_map_iterator_t iter = hashMapIterator_construct(handle->fd_map);
while (hashMapIterator_hasNext(&iter)) {
psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
@@ -619,151 +745,178 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_tcp_msg_header_t
msg.msg_controllen = 0;
if (!handle->bypassHeader) {
msg.msg_iov[0].iov_base = header;
- msg.msg_iov[0].iov_len = sizeof(*header);
- msg.msg_iov[1].iov_base = &size;
- msg.msg_iov[1].iov_len = sizeof(size);
- msg.msg_iov[2].iov_base = buffer;
- msg.msg_iov[2].iov_len = size;
- msg.msg_iovlen = 3;
+ msg.msg_iov[0].iov_len = sizeof(pubsub_tcp_msg_header_t);
+ msg.msg_iov[1].iov_base = buffer;
+ msg.msg_iov[1].iov_len = size;
+ msg.msg_iovlen = 2;
} else {
msg.msg_iov[0].iov_base = buffer;
msg.msg_iov[0].iov_len = size;
msg.msg_iovlen = 1;
}
- int nbytes = 0; //
+ int nbytes = 0;
if (entry->fd >= 0) nbytes = sendmsg(entry->fd, &msg, MSG_NOSIGNAL);
-
// Several errors are OK. When speculative write is being done we may not
- // be able to write a single byte from the socket. Also, SIGSTOP issued
- // by a debugging tool can result in EINTR error.
- if (nbytes == -1
- && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
- result = 0;
- break;
- }
+ // be able to write a single byte to the socket buffer. (socket buffer full)
+ // In this case when socket is not blocking, exit write function.
+ // Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
if (nbytes == -1) {
+ result = ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) ? 0 : -1;
L_ERROR("[TCP Socket] Cannot send msg %s\n", strerror(errno));
- result = -1;
- break;
+ errno = 0;
+ continue;
+ }
+ int msgSize = 0;
+ for (int i = 0; i < msg.msg_iovlen; i++) {
+ msgSize+=msg.msg_iov[i].iov_len;
+ }
+ if (nbytes != msgSize) {
+ L_ERROR("[TCP Socket] MsgSize not correct: %d != nBytess\n", msgSize, nbytes);
+ continue;
}
written += nbytes;
- handle->writeSeqNr++;
}
celixThreadRwlock_unlock(&handle->dbLock);
return (result == 0 ? written : result);
}
const char *pubsub_tcpHandler_url(pubsub_tcpHandler_t *handle) {
- return handle->url;
+ return handle->own.url;
}
int pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
int rc = 0;
if (handle->efd >= 0) {
struct epoll_event events[MAX_EPOLL_EVENTS];
- int nof_events = epoll_wait(handle->efd, events, MAX_EPOLL_EVENTS, handle->timeout);
+ int nof_events = 0;
+ nof_events = epoll_wait(handle->efd, events, MAX_EPOLL_EVENTS, handle->timeout);
if (nof_events < 0) {
- L_ERROR("[TCP Socket] Cannot create epoll wait\n");
- return nof_events;
+ L_ERROR("[TCP Socket] Cannot create epoll wait (%d) %s\n", nof_events, strerror(errno));
+ errno = 0;
}
- int i = 0;
- for (i = 0; i < nof_events; i++) {
- if ((handle->fd >= 0) && (events[i].data.fd == handle->fd)) {
+ for (int i = 0; i < nof_events; i++) {
+ if ((handle->own.fd >= 0) && (events[i].data.fd == handle->own.fd)) {
celixThreadRwlock_writeLock(&handle->dbLock);
// new connection available
struct sockaddr_in their_addr;
socklen_t len = sizeof(struct sockaddr_in);
- rc = accept(handle->fd, &their_addr, &len);
+ int fd = accept(handle->own.fd, &their_addr, &len);
+ rc = fd;
if (rc == -1) {
- if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
- // already closed
- } else
- L_ERROR("[TCP Socket] accept failed: %s\n", strerror(errno));
- } else {
+ L_ERROR("[TCP Socket] accept failed: %s\n", strerror(errno));
+ errno = 0;
+ }
+ // Make file descriptor NonBlocking
+ if ((!handle->useBlocking) && (rc >= 0)) {
+ rc = pubsub_tcpHandler_makeNonBlocking(handle, fd);
+ if (rc < 0) pubsub_tcpHandler_freeEntry(&handle->own);
+ }
+ if (rc >= 0){
// handle new connection:
// add it to reactor, etc
struct epoll_event event;
bzero(&event, sizeof(event)); // zero the struct
-
char *address = inet_ntoa(their_addr.sin_addr);
unsigned int port = ntohs(their_addr.sin_port);
char *url = NULL;
asprintf(&url, "tcp://%s:%u", address, port);
psa_tcp_connection_entry_t *entry = calloc(1, sizeof(*entry));
+ pubsub_tcpHandler_setupEntry(entry, fd, url, MAX_DEFAULT_BUFFER_SIZE);
entry->addr = their_addr;
- entry->len = len;
- entry->url = url;
- entry->fd = rc;
- event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLET | EPOLLOUT;
+ entry->len = len;
+ entry->connected = false;
+ event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLOUT;
event.data.fd = entry->fd;
+ // Register Read to epoll
rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event);
if (rc < 0) {
- close(entry->fd);
- free(entry->url);
+ pubsub_tcpHandler_freeEntry(entry);
free(entry);
L_ERROR("[TCP Socket] Cannot create epoll\n");
} else {
- if (handle->connectMessageCallback)
- handle->connectMessageCallback(handle->connectPayload, entry->url, true);
hashMap_put(handle->fd_map, (void *) (intptr_t) entry->fd, entry);
hashMap_put(handle->url_map, entry->url, entry);
- fprintf(stdout, "[TCP Socket] New connection to url: %s: \n", url);
+ L_INFO("[TCP Socket] New connection to url: %s: \n", url);
}
+ free(url);
}
celixThreadRwlock_unlock(&handle->dbLock);
} else if (events[i].events & EPOLLIN) {
- int err = 0;
- socklen_t len = sizeof(int);
- rc = getsockopt(events[i].data.fd, SOL_SOCKET, SO_ERROR, &err, &len);
- if (rc != 0) {
- L_ERROR("[TCP Socket]: ERROR read from socket \n");
- continue;
- }
- unsigned int index = 0;
- unsigned int size = 0;
- rc = pubsub_tcpHandler_dataAvailable(handle, events[i].data.fd, &index, &size);
- if (rc == 0) {
- pubsub_tcpHandler_closeConnection(handle, events[i].data.fd);
- continue;
- } else if (rc < 0) {
- continue;
- }
- // Handle data
- pubsub_tcp_msg_header_t *msgHeader = NULL;
- void *buffer = NULL;
- rc = pubsub_tcpHandler_read(handle, index, &msgHeader, &buffer, size);
- if (rc != 0) {
- L_ERROR("[TCP Socket]: ERROR read with index %d\n", index);
- continue;
- }
- celixThreadRwlock_readLock(&handle->dbLock);
- if (handle->processMessageCallback) {
+ int count = 0;
+ bool isReading = true;
+ while(isReading) {
+ unsigned int index = 0;
+ unsigned int size = 0;
+ isReading = (handle->useBlocking) ? false : isReading;
+ count++;
+ rc = pubsub_tcpHandler_dataAvailable(handle, events[i].data.fd, &index, &size);
+ if (rc <= 0) {
+ // close connection.
+ if (rc == 0) {
+ pubsub_tcpHandler_closeConnection(handle, events[i].data.fd);
+ }
+ isReading = false;
+ break;
+ }
+ // Handle data
+ void *buffer = NULL;
+ pubsub_tcp_msg_header_t *msgHeader = NULL;
+ rc = pubsub_tcpHandler_read(handle, events[i].data.fd, index, &msgHeader, &buffer, size);
+ if (rc != 0) {
+ isReading = false;
+ break;
+ }
+ celixThreadRwlock_readLock(&handle->dbLock);
+ if ((handle->processMessageCallback)&&(buffer)) {
struct timespec receiveTime;
clock_gettime(CLOCK_REALTIME, &receiveTime);
- handle->processMessageCallback(handle->processMessagePayload, msgHeader, buffer, size,
- &receiveTime);
+ handle->processMessageCallback(handle->processMessagePayload, msgHeader, buffer, size, &receiveTime);
+ isReading = false;
+ }
+ celixThreadRwlock_unlock(&handle->dbLock);
}
- celixThreadRwlock_unlock(&handle->dbLock);
} else if (events[i].events & EPOLLOUT) {
int err = 0;
socklen_t len = sizeof(int);
rc = getsockopt(events[i].data.fd, SOL_SOCKET, SO_ERROR, &err, &len);
if (rc != 0) {
- L_ERROR("[TCP Socket]: ERROR read from socket \n");
+ L_ERROR("[TCP Socket]:EPOLLOUT ERROR read from socket %s\n", strerror(errno));
+ errno = 0;
continue;
}
+ celixThreadRwlock_readLock(&handle->dbLock);
+ psa_tcp_connection_entry_t *entry = hashMap_get(handle->fd_map, (void *) (intptr_t) events[i].data.fd);
+ if (entry)
+ if ((!entry->connected)) {
+ // tell sender that an receiver is connected
+ if (handle->connectMessageCallback) handle->connectMessageCallback(handle->connectPayload, entry->url, false);
+ entry->connected = true;
+ struct epoll_event event;
+ bzero(&event, sizeof(event)); // zero the struct
+ event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR;
+ event.data.fd = events[i].data.fd;
+ // Register Modify epoll
+ rc = epoll_ctl(handle->efd, EPOLL_CTL_MOD, events[i].data.fd, &event);
+ //pubsub_tcpHandler_makeBlocking(handle, events[i].data.fd);
+ if (rc < 0) {
+ L_ERROR("[TCP Socket] Cannot create epoll\n");
+ }
+ }
+ celixThreadRwlock_unlock(&handle->dbLock);
} else if (events[i].events & EPOLLRDHUP) {
int err = 0;
socklen_t len = sizeof(int);
rc = getsockopt(events[i].data.fd, SOL_SOCKET, SO_ERROR, &err, &len);
if (rc != 0) {
- L_ERROR("[TCP Socket]: ERROR read from socket \n");
+ L_ERROR("[TCP Socket]:EPOLLRDHUP ERROR read from socket %s\n",strerror(errno));
+ errno = 0;
continue;
}
pubsub_tcpHandler_closeConnection(handle, events[i].data.fd);
} else if (events[i].events & EPOLLERR) {
- L_ERROR("[TCP Socket]: ERROR read from socket \n");
+ L_ERROR("[TCP Socket]:EPOLLERR ERROR read from socket %s\n",strerror(errno));
+ errno = 0;
continue;
}
}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
index c618a6f..c04e05b 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
@@ -57,9 +57,10 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char* url);
int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle, unsigned int maxNofBuffers, unsigned int bufferSize);
void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle, unsigned int timeout);
+void pubsub_tcpHandler_setBypassHeader(pubsub_tcpHandler_t *handle, bool bypassHeader, unsigned int msgIdOffset, unsigned int msgIdSize);
-int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigned int *index, unsigned int *size);
-int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, unsigned int index, pubsub_tcp_msg_header_t** header, void ** buffer, unsigned int size);
+ int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigned int *index, unsigned int *size);
+int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd, unsigned int index, pubsub_tcp_msg_header_t** header, void ** buffer, unsigned int size);
int pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle);
int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_tcp_msg_header_t* header, void* buffer, unsigned int size, int flags);
int pubsub_tcpHandler_addMessageHandler(pubsub_tcpHandler_t *handle, void* payload, pubsub_tcpHandler_processMessage_callback_t processMessageCallback);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_msg_header.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_msg_header.h
index e47e76d..38bff8f 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_msg_header.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_msg_header.h
@@ -20,14 +20,21 @@
#ifndef PUBSUB_PSA_TCP_MSG_HEADER_H_
#define PUBSUB_PSA_TCP_MSG_HEADER_H_
+#define MARKER_START_PATTERN (0x56781234)
+#define MARKER_END_PATTERN (0x67812345)
+
typedef struct pubsub_tcp_msg_header {
- uint32_t type; //msg type id (hash of fqn)
- uint32_t seqNr;
- uint8_t major;
- uint8_t minor;
- unsigned char originUUID[16];
- uint64_t sendtimeSeconds; //seconds since epoch
- uint64_t sendTimeNanoseconds; //ns since epoch
+ uint32_t marker_start;
+ uint32_t type; //msg type id (hash of fqn)
+ uint32_t seqNr;
+ uint8_t major;
+ uint8_t minor;
+ uint16_t padding;
+ unsigned char originUUID[16];
+ uint64_t sendtimeSeconds; //seconds since epoch
+ uint64_t sendTimeNanoseconds; //ns since epoch
+ uint32_t bufferSize; //Size of the buffer
+ uint32_t marker_end;
} pubsub_tcp_msg_header_t;
-#endif
\ No newline at end of file
+#endif
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index d65c6f7..c2f12e6 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -193,6 +193,13 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
pubsub_tcpHandler_addConnectionCallback(receiver->socketHandler, receiver, psa_tcp_connectHandler, psa_tcp_disConnectHandler);
}
+ if (topicProperties != NULL) {
+ bool bypassHeader = celix_properties_getAsBool((celix_properties_t *) topicProperties, PUBSUB_TCP_BYPASS_HEADER, PUBSUB_TCP_DEFAULT_BYPASS_HEADER);
+ long msgIdOffset = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_MESSAGE_ID_OFFSET, PUBSUB_TCP_DEFAULT_MESSAGE_ID_OFFSET);
+ long msgIdSize = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_MESSAGE_ID_SIZE, PUBSUB_TCP_DEFAULT_MESSAGE_ID_SIZE);
+ pubsub_tcpHandler_setBypassHeader(receiver->socketHandler, bypassHeader, (unsigned int)msgIdOffset, (unsigned int)msgIdSize);
+ }
+
psa_tcp_setScopeAndTopicFilter(scope, topic, receiver->scopeAndTopicFilter);
receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
PSA_TCP_DEFAULT_METRICS_ENABLED);
@@ -710,10 +717,10 @@ static void psa_tcp_disConnectHandler(void *handle, const char *url, bool lock)
pubsub_tcp_topic_receiver_t *receiver = handle;
L_DEBUG("[PSA TCP] TopicReceiver %s/%s disconnect from tcp url %s", receiver->scope, receiver->topic, url);
if (lock) celixThreadMutex_lock(&receiver->requestedConnections.mutex);
- psa_tcp_requested_connection_entry_t *entry = hashMap_remove(receiver->requestedConnections.map, url);
+ psa_tcp_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, url);
if (entry != NULL) {
- free(entry->url);
- free(entry);
+ entry->connected = false;
+ receiver->requestedConnections.allConnected = false;
}
if (lock) celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index 1f4b5d1..bdce2ae 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -138,8 +138,13 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
if (uuid != NULL) {
uuid_parse(uuid, sender->fwUUID);
}
- sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED);
-
+ sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED);
+ if (topicProperties != NULL) {
+ bool bypassHeader = celix_properties_getAsBool((celix_properties_t *) topicProperties, PUBSUB_TCP_BYPASS_HEADER, PUBSUB_TCP_DEFAULT_BYPASS_HEADER);
+ long msgIdOffset = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_MESSAGE_ID_OFFSET, PUBSUB_TCP_DEFAULT_MESSAGE_ID_OFFSET);
+ long msgIdSize = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_MESSAGE_ID_SIZE, PUBSUB_TCP_DEFAULT_MESSAGE_ID_SIZE);
+ pubsub_tcpHandler_setBypassHeader(sender->socketHandler, bypassHeader, (unsigned int)msgIdOffset, (unsigned int)msgIdSize);
+ }
/* Check if it's a static endpoint */
bool isEndPointTypeClient = false;
bool isEndPointTypeServer = false;
@@ -188,23 +193,16 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
/* Randomized part due to same bundle publishing on different topics */
unsigned int port = rand_range(basePort, maxPort);
char *url = NULL;
- char *bindUrl = NULL;
- if(bindIP == NULL) {
- asprintf(&bindUrl, "tcp://0.0.0.0:%u", port);
- } else {
- asprintf(&bindUrl, "tcp://%s:%u", bindIP, port);
- }
-
- asprintf(&url, "tcp://%s:%u", bindIP, port);
- int rv = pubsub_tcpHandler_listen(sender->socketHandler, bindUrl);
+ if (bindIP == NULL) asprintf(&url, "tcp://0.0.0.0:%u", port);
+ else asprintf(&url, "tcp://%s:%u", bindIP, port);
+ int rv = pubsub_tcpHandler_listen(sender->socketHandler, url);
if (rv == -1) {
- L_WARN("Error for tcp_bind using dynamic bind url '%s'. %s", bindUrl, strerror(errno));
+ L_WARN("Error for tcp_bind using dynamic bind url '%s'. %s", url, strerror(errno));
free(url);
} else {
sender->url = url;
}
retry++;
- free(bindUrl);
}
}
}