You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rb...@apache.org on 2020/10/06 20:02:42 UTC
[celix] branch feature/tcp_admin_segm updated: Add message
segmentation to tcp admin
This is an automated email from the ASF dual-hosted git repository.
rbulter pushed a commit to branch feature/tcp_admin_segm
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/feature/tcp_admin_segm by this push:
new 5ad77fe Add message segmentation to tcp admin
5ad77fe is described below
commit 5ad77fe50dfaea2f79e17b9e8d5e55ffba89da11
Author: Roy Bulter <ro...@gmail.com>
AuthorDate: Tue Oct 6 22:02:27 2020 +0200
Add message segmentation to tcp admin
---
bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt | 1 +
.../pubsub/pubsub_admin_tcp/src/psa_activator.c | 4 +-
.../src/pubsub_psa_tcp_constants.h | 34 +-
.../pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c | 45 +-
.../pubsub_admin_tcp/src/pubsub_tcp_common.h | 2 +
.../pubsub_admin_tcp/src/pubsub_tcp_handler.c | 719 +++++++++++----------
.../pubsub_admin_tcp/src/pubsub_tcp_handler.h | 8 +-
.../src/pubsub_tcp_topic_receiver.c | 176 +++--
.../src/pubsub_tcp_topic_receiver.h | 3 +-
.../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c | 169 +++--
.../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h | 11 +-
.../src/pubsub_topology_manager.c | 6 +-
.../src/pubsub_topology_manager.h | 2 +-
.../pubsub/pubsub_utils/include/pubsub_utils_url.h | 8 +-
bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c | 22 +-
bundles/pubsub/test/CMakeLists.txt | 28 +-
bundles/pubsub/test/meta_data/ping2.properties | 2 +-
bundles/pubsub/test/meta_data/pong2.properties | 4 +-
bundles/pubsub/test/test/loopback_activator.c | 3 +-
bundles/pubsub/test/test/tst_endpoint_activator.c | 2 +-
20 files changed, 651 insertions(+), 598 deletions(-)
diff --git a/bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt b/bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt
index a3bedf4..a19b425 100644
--- a/bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt
@@ -28,6 +28,7 @@ add_celix_bundle(celix_pubsub_admin_tcp
src/pubsub_tcp_topic_sender.c
src/pubsub_tcp_topic_receiver.c
src/pubsub_tcp_handler.c
+ src/pubsub_tcp_common.c
)
set_target_properties(celix_pubsub_admin_tcp PROPERTIES INSTALL_RPATH "$ORIGIN")
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c b/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c
index a5ae576..d93c478 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c
@@ -121,9 +121,7 @@ int psa_tcp_start(psa_tcp_activator_t *act, celix_bundle_context_t *ctx) {
celix_properties_t *props = celix_properties_create();
celix_properties_set(props, CELIX_SHELL_COMMAND_NAME, "celix::psa_tcp");
celix_properties_set(props, CELIX_SHELL_COMMAND_USAGE, "psa_tcp");
- celix_properties_set(props,
- CELIX_SHELL_COMMAND_DESCRIPTION,
- "Print the information about the TopicSender and TopicReceivers for the TCP PSA");
+ celix_properties_set(props, CELIX_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the TCP PSA");
act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, CELIX_SHELL_COMMAND_SERVICE_NAME, props);
}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
index 4284042..1c28086 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
@@ -23,19 +23,20 @@
#define PSA_TCP_BASE_PORT "PSA_TCP_BASE_PORT"
#define PSA_TCP_MAX_PORT "PSA_TCP_MAX_PORT"
-#define PSA_TCP_MAX_RECV_SESSIONS "PSA_TCP_MAX_RECV_SESSIONS"
+#define PSA_TCP_MAX_MESSAGE_SIZE "PSA_TCP_MAX_MESSAGE_SIZE"
#define PSA_TCP_RECV_BUFFER_SIZE "PSA_TCP_RECV_BUFFER_SIZE"
#define PSA_TCP_TIMEOUT "PSA_TCP_TIMEOUT"
#define PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT "PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT"
+#define PSA_TCP_SEND_DELAY "PSA_TCP_SEND_DELAY"
#define PSA_TCP_DEFAULT_BASE_PORT 5501
#define PSA_TCP_DEFAULT_MAX_PORT 6000
-#define PSA_TCP_DEFAULT_MAX_RECV_SESSIONS 1
-
+#define PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE UINT32_MAX
#define PSA_TCP_DEFAULT_RECV_BUFFER_SIZE 65 * 1024
#define PSA_TCP_DEFAULT_TIMEOUT 2000 // 2 seconds
#define PSA_TCP_SUBSCRIBER_CONNECTION_DEFAULT_TIMEOUT 250 // 250 ms
+#define PSA_TCP_DEFAULT_SEND_DELAY 250 // 250 ms
#define PSA_TCP_DEFAULT_QOS_SAMPLE_SCORE 30
#define PSA_TCP_DEFAULT_QOS_CONTROL_SCORE 70
@@ -102,21 +103,30 @@
*/
#define PUBSUB_TCP_STATIC_CONNECT_URLS "tcp.static.connect.urls"
+
/**
- * Name of environment variable with space-separated list of ips/urls to connect to
- * e.g. PSA_TCP_STATIC_CONNECT_FOR_topic_scope="tcp://127.0.0.1:4444 tcp://127.0.0.2:4444"
+ * Defines if the publisher / subscriber is a passive endpoint and shares
+ * the connection with publisher / subscriber endpoint with the matching (passive) key
*/
-#define PUBSUB_TCP_STATIC_CONNECT_URLS_FOR "PSA_TCP_STATIC_CONNECT_URL_FOR_"
+#define PUBSUB_TCP_PASSIVE_CONFIGURED "tcp.passive.configured"
+#define PUBSUB_TCP_PASSIVE_KEY "tcp.passive.key"
/**
- * The static endpoint type which a static endpoint should be configured.
- * Can be set in the topic properties.
+ * Name of environment variable to indicate that passive endpoint is configured
+ * e.g. PSA_TCP_PASSIVE_CONFIGURED_topic_scope="true"
*/
-#define PUBSUB_TCP_STATIC_ENDPOINT_TYPE "tcp.static.endpoint.type"
-
-#define PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER "server"
-#define PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT "client"
+#define PUBSUB_TCP_PASSIVE_ENABLED "PSA_TCP_PASSIVE_CONFIGURED_"
+/**
+ * Name of environment variable to configure the passive key (see PUBSUB_TCP_PASSIVE_KEY )
+ * e.g. PSA_TCP_PASSIVE_KEY__topic_scope="tcp://localhost:4444"
+ */
+#define PUBSUB_TCP_PASSIVE_SELECTION_KEY "PSA_TCP_PASSIVE_KEY_"
+/**
+ * Name of environment variable with space-separated list of ips/urls to connect to
+ * e.g. PSA_TCP_STATIC_CONNECT_FOR_topic_scope="tcp://127.0.0.1:4444 tcp://127.0.0.2:4444"
+ */
+#define PUBSUB_TCP_STATIC_CONNECT_URLS_FOR "PSA_TCP_STATIC_CONNECT_URL_FOR_"
/**
* Realtime thread prio and scheduling information. This is used to setup the thread prio/sched of the
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
index e7ad284..086837f 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
@@ -434,17 +434,19 @@ celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope,
const char *psaType = PUBSUB_TCP_ADMIN_TYPE;
const char *serType = serEntry->serType;
const char *protType = protEntry->protType;
- newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
- serType, protType, NULL);
+ newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, protType, NULL);
celix_properties_set(newEndpoint, PUBSUB_TCP_URL_KEY, pubsub_tcpTopicSender_url(sender));
celix_properties_setBool(newEndpoint, PUBSUB_TCP_STATIC_CONFIGURED, pubsub_tcpTopicSender_isStatic(sender));
- celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_SYSTEM_VISIBILITY);
-
+ if (pubsub_tcpTopicSender_isPassive(sender)) {
+ celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_LOCAL_VISIBILITY);
+ } else {
+ celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_SYSTEM_VISIBILITY);
+ }
//if available also set container name
const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
if (cn != NULL)
- celix_properties_set(newEndpoint, "container_name", cn);
+ celix_properties_set(newEndpoint, "container_name", cn);
hashMap_put(psa->topicSenders.map, key, sender);
} else {
L_ERROR("[PSA TCP] Error creating a TopicSender");
@@ -636,24 +638,13 @@ pubsub_tcpAdmin_disconnectEndpointFromReceiver(pubsub_tcp_admin_t *psa, pubsub_t
//note can be called with discoveredEndpoint.mutex lock
celix_status_t status = CELIX_SUCCESS;
- const char *scope = pubsub_tcpTopicReceiver_scope(receiver);
- const char *topic = pubsub_tcpTopicReceiver_topic(receiver);
-
- const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
- const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
const char *url = celix_properties_get(endpoint, PUBSUB_TCP_URL_KEY, NULL);
if (url == NULL) {
L_WARN("[PSA TCP] Error got endpoint without tcp url");
status = CELIX_BUNDLE_EXCEPTION;
} else {
- if (eTopic != NULL && topic != NULL && strncmp(eTopic, topic, 1024 * 1024) == 0) {
- if (scope == NULL && eScope == NULL) {
- pubsub_tcpTopicReceiver_disconnectFrom(receiver, url);
- } else if (scope != NULL && eScope != NULL && strncmp(eScope, scope, 1024 * 1024) == 0) {
- pubsub_tcpTopicReceiver_disconnectFrom(receiver, url);
- }
- }
+ pubsub_tcpTopicReceiver_disconnectFrom(receiver, url);
}
return status;
@@ -689,6 +680,21 @@ bool pubsub_tcpAdmin_executeCommand(void *handle, const char *commandLine __attr
FILE *errStream __attribute__((unused))) {
pubsub_tcp_admin_t *psa = handle;
celix_status_t status = CELIX_SUCCESS;
+ char *line = celix_utils_strdup(commandLine);
+ char *token = line;
+ strtok_r(line, " ", &token); //first token is command name
+ strtok_r(NULL, " ", &token); //second token is sub command
+
+ if (celix_utils_stringEquals(token, "nr_of_receivers")) {
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ fprintf(out,"%i\n", hashMap_size(psa->topicReceivers.map));
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+ }
+ if (celix_utils_stringEquals(token, "nr_of_senders")) {
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ fprintf(out, "%i\n", hashMap_size(psa->topicSenders.map));
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+ }
fprintf(out, "\n");
fprintf(out, "Topic Senders:\n");
@@ -707,11 +713,12 @@ bool pubsub_tcpAdmin_executeCommand(void *handle, const char *commandLine __attr
const char *scope = pubsub_tcpTopicSender_scope(sender);
const char *topic = pubsub_tcpTopicSender_topic(sender);
const char *url = pubsub_tcpTopicSender_url(sender);
+ const char *isPassive = pubsub_tcpTopicSender_isPassive(sender) ? " (passive)" : "";
const char *postUrl = pubsub_tcpTopicSender_isStatic(sender) ? " (static)" : "";
fprintf(out, "|- Topic Sender %s/%s\n", scope == NULL ? "(null)" : scope, topic);
fprintf(out, " |- serializer type = %s\n", serType);
fprintf(out, " |- protocol type = %s\n", protType);
- fprintf(out, " |- url = %s%s\n", url, postUrl);
+ fprintf(out, " |- url = %s%s%s\n", url, postUrl, isPassive);
}
celixThreadMutex_unlock(&psa->topicSenders.mutex);
celixThreadMutex_unlock(&psa->protocols.mutex);
@@ -788,4 +795,4 @@ pubsub_admin_metrics_t *pubsub_tcpAdmin_metrics(void *handle) {
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
return result;
-}
\ No newline at end of file
+}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
index 1fe1d00..9ea31db 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
@@ -28,4 +28,6 @@ typedef struct pubsub_tcp_endPointStore {
hash_map_t *map;
} pubsub_tcp_endPointStore_t;
+bool psa_tcp_isPassive(const char* buffer);
+
#endif //CELIX_PUBSUB_TCP_COMMON_H
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 0cbd28f..51103f0 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -31,6 +31,7 @@
#include <errno.h>
#include <array_list.h>
#include <pthread.h>
+#include <sys/ioctl.h>
#if defined(__APPLE__)
#include <sys/types.h>
#include <sys/event.h>
@@ -78,18 +79,26 @@ typedef struct psa_tcp_connection_entry {
bool connected;
bool headerError;
pubsub_protocol_message_t header;
+ unsigned int maxMsgSize;
unsigned int syncSize;
unsigned int headerSize;
- unsigned int headerBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload
- void *headerBuffer;
- unsigned int footerSize;
- void *footerBuffer;
+ unsigned int readHeaderBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload
+ void *readHeaderBuffer;
+ unsigned int writeHeaderBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload
+ void *writeHeaderBuffer;
+ unsigned int readFooterSize;
+ void *readFooterBuffer;
+ unsigned int writeFooterSize;
+ void *writeFooterBuffer;
unsigned int bufferSize;
void *buffer;
- unsigned int bufferReadSize;
- unsigned int metaBufferSize;
- void *metaBuffer;
+ size_t readMetaBufferSize;
+ void *readMetaBuffer;
+ size_t writeMetaBufferSize;
+ void *writeMetaBuffer;
unsigned int retryCount;
+ celix_thread_mutex_t writeMutex;
+ celix_thread_mutex_t readMutex;
} psa_tcp_connection_entry_t;
//
@@ -114,38 +123,25 @@ struct pubsub_tcpHandler {
celix_log_helper_t *logHelper;
pubsub_protocol_service_t *protocol;
unsigned int bufferSize;
- unsigned int maxNofBuffer;
+ unsigned int maxMsgSize;
unsigned int maxSendRetryCount;
unsigned int maxRcvRetryCount;
double sendTimeout;
double rcvTimeout;
celix_thread_t thread;
bool running;
+ bool enableReceiveEvent;
};
-static inline int
-pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock);
-
+static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock);
static inline int pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry);
-
static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int fd);
-
-static inline psa_tcp_connection_entry_t *
-pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, char *external_url,
- struct sockaddr_in *addr);
-
+static inline psa_tcp_connection_entry_t* pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, char *external_url, struct sockaddr_in *addr);
static inline void pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry);
-
static inline void pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *handle, int fd, unsigned int index);
-
-static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* buffer, unsigned int offset, unsigned int size, int flag );
-
static inline void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry);
-
static inline void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd);
-
static inline void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle);
-
static void *pubsub_tcpHandler_thread(void *data);
//
@@ -167,7 +163,6 @@ pubsub_tcpHandler_t *pubsub_tcpHandler_create(pubsub_protocol_service_t *protoco
handle->logHelper = logHelper;
handle->protocol = protocol;
handle->bufferSize = MAX_DEFAULT_BUFFER_SIZE;
- handle->maxNofBuffer = 1; // Reserved for future Use;
celixThreadRwlock_create(&handle->dbLock, 0);
handle->running = true;
celixThread_create(&handle->thread, NULL, pubsub_tcpHandler_thread, handle);
@@ -261,7 +256,7 @@ int pubsub_tcpHandler_open(pubsub_tcpHandler_t *handle, char *url) {
L_ERROR("[TCP Socket] Error setsockopt (SO_RCVTIMEO) to set send timeout: %s", strerror(errno));
}
}
- struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr);
+ struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr);
if (addr) {
rc = bind(fd, (struct sockaddr *) addr, sizeof(struct sockaddr));
if (rc != 0) {
@@ -311,6 +306,8 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
if (fd >= 0) {
entry = calloc(sizeof(psa_tcp_connection_entry_t), 1);
entry->fd = fd;
+ celixThreadMutex_create(&entry->writeMutex, NULL);
+ celixThreadMutex_create(&entry->readMutex, NULL);
if (url)
entry->url = strndup(url, 1024 * 1024);
if (interface_url) {
@@ -326,17 +323,29 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
handle->protocol->getHeaderSize(handle->protocol->handle, &size);
entry->headerSize = size;
handle->protocol->getHeaderBufferSize(handle->protocol->handle, &size);
- entry->headerBufferSize = size;
+ entry->readHeaderBufferSize = size;
+ entry->writeHeaderBufferSize = size;
handle->protocol->getSyncHeaderSize(handle->protocol->handle, &size);
entry->syncSize = size;
handle->protocol->getFooterSize(handle->protocol->handle, &size);
- entry->footerSize = size;
+ entry->readFooterSize = size;
+ entry->writeFooterSize = size;
entry->bufferSize = handle->bufferSize;
+ // incase header is included in the payload buffer, make at least the payload buffer the size of the header
+ if (!entry->readHeaderBufferSize) {
+ entry->bufferSize = MAX(handle->bufferSize, entry->headerSize);
+ }
entry->connected = false;
- if (entry->headerBufferSize) {
- entry->headerBuffer = calloc(sizeof(char), entry->headerSize);
+ unsigned minimalMsgSize = entry->writeHeaderBufferSize + entry->writeFooterSize;
+ if ((minimalMsgSize > handle->maxMsgSize) && (handle->maxMsgSize)) {
+ L_ERROR("[TCP Socket] maxMsgSize (%d) < headerSize + FooterSize (%d): %s\n", handle->maxMsgSize, minimalMsgSize);
+ } else {
+ entry->maxMsgSize = (handle->maxMsgSize) ? handle->maxMsgSize : UINT32_MAX;
}
- if (entry->footerSize) entry->footerBuffer = calloc(sizeof(char), entry->footerSize);
+ if (entry->readHeaderBufferSize) entry->readHeaderBuffer = calloc(sizeof(char), entry->readHeaderBufferSize);
+ if (entry->writeHeaderBufferSize) entry->writeHeaderBuffer = calloc(sizeof(char), entry->writeHeaderBufferSize);
+ if (entry->readFooterSize) entry->readFooterBuffer = calloc(sizeof(char), entry->readFooterSize);
+ if (entry->writeFooterSize) entry->writeFooterBuffer = calloc(sizeof(char), entry->writeFooterSize);
if (entry->bufferSize) entry->buffer = calloc(sizeof(char), entry->bufferSize);
}
return entry;
@@ -348,40 +357,18 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
static inline void
pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry) {
if (entry) {
- if (entry->url) {
- free(entry->url);
- entry->url = NULL;
- }
- if (entry->interface_url) {
- free(entry->interface_url);
- entry->interface_url = NULL;
- }
- if (entry->fd >= 0) {
- close(entry->fd);
- entry->fd = -1;
- }
- if (entry->buffer) {
- free(entry->buffer);
- entry->buffer = NULL;
- entry->bufferSize = 0;
- }
- if (entry->headerBuffer) {
- free(entry->headerBuffer);
- entry->headerBuffer = NULL;
- entry->headerBufferSize = 0;
- }
-
- if (entry->footerBuffer) {
- free(entry->footerBuffer);
- entry->footerBuffer = NULL;
- }
-
- if (entry->metaBuffer) {
- free(entry->metaBuffer);
- entry->metaBuffer = NULL;
- entry->metaBufferSize = 0;
- }
- entry->connected = false;
+ free(entry->url);
+ free(entry->interface_url);
+ if (entry->fd >= 0) close(entry->fd);
+ free(entry->buffer);
+ free(entry->readHeaderBuffer);
+ free(entry->writeHeaderBuffer);
+ free(entry->readFooterBuffer);
+ free(entry->writeFooterBuffer);
+ free(entry->readMetaBuffer);
+ free(entry->writeMetaBuffer);
+ celixThreadMutex_destroy(&entry->writeMutex);
+ celixThreadMutex_destroy(&entry->readMutex);
free(entry);
}
}
@@ -403,8 +390,7 @@ pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *handle, int fd, unsign
//
int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
int rc = 0;
- psa_tcp_connection_entry_t *entry =
- hashMap_get(handle->connection_url_map, (void *) (intptr_t) url);
+ psa_tcp_connection_entry_t *entry = hashMap_get(handle->connection_url_map, (void *) (intptr_t) url);
if (entry == NULL) {
pubsub_utils_url_t *url_info = pubsub_utils_url_parse(url);
int fd = pubsub_tcpHandler_open(handle, url_info->interface_url);
@@ -414,12 +400,11 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
socklen_t len = sizeof(sin);
getsockname(fd, (struct sockaddr *) &sin, &len);
char *interface_url = pubsub_utils_url_get_url(&sin, NULL);
- struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr);
+ struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr);
if ((rc >= 0) && addr) {
rc = connect(fd, (struct sockaddr *) addr, sizeof(struct sockaddr));
if (rc < 0 && errno != EINPROGRESS) {
- L_ERROR("[TCP Socket] Cannot connect to %s:%d: using; %s err: %s\n", url_info->hostname, url_info->portnr, interface_url,
- strerror(errno));
+ L_ERROR("[TCP Socket] Cannot connect to %s:%d: using %s err(%d): %s\n", url_info->hostname, url_info->port_nr, interface_url, errno, strerror(errno));
close(fd);
} else {
entry = pubsub_tcpHandler_createEntry(handle, fd, url, interface_url, &sin);
@@ -552,7 +537,7 @@ static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle,
else {
rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
if (rc < 0) {
- L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING epoll: %s\n", strerror(errno));
+ L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING: %s\n", strerror(errno));
}
}
return rc;
@@ -570,6 +555,7 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url) {
if (entry == NULL) {
char protocol[] = "tcp";
int fd = pubsub_tcpHandler_open(handle, url);
+ rc = fd;
struct sockaddr_in *sin = pubsub_utils_url_from_fd(fd);
// Make handler fd entry
char *pUrl = pubsub_utils_url_get_url(sin, protocol);
@@ -579,7 +565,6 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url) {
free(pUrl);
free(sin);
celixThreadRwlock_writeLock(&handle->dbLock);
- rc = fd;
if (rc >= 0) {
rc = listen(fd, SOMAXCONN);
if (rc != 0) {
@@ -628,16 +613,24 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url) {
}
//
-// Setup buffer sizes
+// Setup receive buffer size
//
-int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle,
- unsigned int maxNofBuffers
- __attribute__((__unused__)),
- unsigned int bufferSize) {
+int pubsub_tcpHandler_setReceiveBufferSize(pubsub_tcpHandler_t *handle, unsigned int size) {
if (handle != NULL) {
celixThreadRwlock_writeLock(&handle->dbLock);
- handle->bufferSize = bufferSize;
- handle->maxNofBuffer = maxNofBuffers;
+ handle->bufferSize = size;
+ celixThreadRwlock_unlock(&handle->dbLock);
+ }
+ return 0;
+}
+
+//
+// Set Maximum message size
+//
+int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int size) {
+ if (handle != NULL) {
+ celixThreadRwlock_writeLock(&handle->dbLock);
+ handle->maxMsgSize = size;
celixThreadRwlock_unlock(&handle->dbLock);
}
return 0;
@@ -708,7 +701,7 @@ void pubsub_tcpHandler_setThreadPriority(pubsub_tcpHandler_t *handle, long prio,
if (prio > 0 && prio < 100) {
struct sched_param sch;
bzero(&sch, sizeof(struct sched_param));
- sch.sched_priority = prio;
+ sch.sched_priority = (int)prio;
pthread_setschedparam(handle->thread.thread, policy, &sch);
} else {
L_INFO("Skipping configuration of thread prio to %i and thread "
@@ -752,35 +745,23 @@ void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double tim
}
}
-static inline
-int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* _buffer, unsigned int offset, unsigned int size, int flag ) {
- int expectedReadSize = size;
- int nbytes = size;
- int msgSize = 0;
- char* buffer = (char*)_buffer;
- while (nbytes > 0 && expectedReadSize > 0) {
- // Read the message header
- nbytes = recv(fd, &buffer[offset], expectedReadSize, flag | MSG_NOSIGNAL);
- // Update buffer administration
- offset += nbytes;
- expectedReadSize -= nbytes;
- msgSize += nbytes;
+void pubsub_tcpHandler_enableReceiveEvent(pubsub_tcpHandler_t *handle,bool enable) {
+ if (handle != NULL) {
+ celixThreadRwlock_writeLock(&handle->dbLock);
+ handle->enableReceiveEvent = enable;
+ celixThreadRwlock_unlock(&handle->dbLock);
}
- if (nbytes <=0) msgSize = nbytes;
- return msgSize;
}
-
static inline
void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry) {
if (entry->header.header.payloadSize > 0) {
- handle->protocol->decodePayload(handle->protocol->handle, entry->buffer, entry->header.header.payloadSize, &entry->header);
+ handle->protocol->decodePayload(handle->protocol->handle, entry->buffer, entry->header.header.payloadSize, &entry->header);
}
if (entry->header.header.metadataSize > 0) {
- handle->protocol->decodeMetadata(handle->protocol->handle, entry->metaBuffer,
- entry->header.header.metadataSize, &entry->header);
- entry->metaBufferSize = entry->header.header.metadataSize;
+ handle->protocol->decodeMetadata(handle->protocol->handle, entry->readMetaBuffer,
+ entry->header.header.metadataSize, &entry->header);
}
if (handle->processMessageCallback && entry->header.payload.payload != NULL && entry->header.payload.length) {
struct timespec receiveTime;
@@ -791,13 +772,12 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec
}
}
-
//
// Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure
// If the message is completely reassembled true is returned and the index and size have valid values
//
int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
- celixThreadRwlock_writeLock(&handle->dbLock);
+ celixThreadRwlock_readLock(&handle->dbLock);
psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) fd);
if (entry == NULL)
entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd);
@@ -811,125 +791,157 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
celixThreadRwlock_unlock(&handle->dbLock);
return -1;
}
+ int nofBytesInReadBuffer = 0;
+ if (ioctl(fd, FIONREAD, &nofBytesInReadBuffer)) {
+ L_ERROR("[TCP Socket] socket: %d, url: %s, cannot read nof bytes in socket read buffer \n", entry->fd, entry->url);
+ }
+ // if socket buffer is not filled, return out of function
+ if (!nofBytesInReadBuffer) {
+ celixThreadRwlock_unlock(&handle->dbLock);
+ return 1;
+ }
+ celixThreadMutex_lock(&entry->readMutex);
+ // When header is included in payload buffer, allocate buffer.
+ // bufferSize is at least the header size
+ if ((entry->buffer == NULL) && (entry->readHeaderBufferSize == 0)) {
+ entry->buffer = malloc((size_t) handle->bufferSize);
+ }
+ struct msghdr msg;
+ struct iovec msg_iov[IOV_MAX];
+ memset(&msg, 0x00, sizeof(struct msghdr));
+ msg.msg_iov = msg_iov;
- // Message buffer is to small, reallocate to make it bigger
- if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize)) {
- handle->bufferSize = MAX(handle->bufferSize, entry->headerSize );
- if (entry->buffer) free(entry->buffer);
- entry->buffer = malloc((size_t) handle->bufferSize);
- entry->bufferSize = handle->bufferSize;
- }
// Read the message
- bool validMsg = false;
- char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer;
- int nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, MSG_PEEK);
- if (nbytes > 0) {
- // Check header message buffer
- if (handle->protocol->decodeHeader(handle->protocol->handle, header_buffer, entry->headerSize, &entry->header) != CELIX_SUCCESS) {
+ msg.msg_iovlen = 0;
+ msg.msg_iov[msg.msg_iovlen].iov_base = (entry->readHeaderBufferSize) ? entry->readHeaderBuffer : entry->buffer;
+ msg.msg_iov[msg.msg_iovlen].iov_len = entry->headerSize;
+ msg.msg_iovlen++;
+ long int msgSize = 0;
+ long int nbytes = recvmsg(fd, &msg, MSG_PEEK | MSG_NOSIGNAL);
+ if (nbytes >= entry->headerSize) {
+ msg.msg_iovlen--;
+ if (handle->protocol->decodeHeader(handle->protocol->handle,
+ msg.msg_iov[msg.msg_iovlen].iov_base,
+ msg.msg_iov[msg.msg_iovlen].iov_len,
+ &entry->header) != CELIX_SUCCESS) {
// Did not receive correct header
// skip sync word and try to read next header
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize, 0);
if (!entry->headerError) {
L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url);
}
entry->headerError = true;
- entry->bufferReadSize = 0;
+ msg.msg_iov[msg.msg_iovlen].iov_len = entry->syncSize;
+ msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+ msg.msg_iovlen++;
} else {
- // Read header message from queue
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, 0);
- if ((nbytes > 0) && (nbytes == entry->headerSize)) {
- entry->headerError = false;
- // For headerless message, add header to bufferReadSize;
- if (!entry->headerBufferSize)
- entry->bufferReadSize += nbytes;
- // Alloc message buffers
- if (entry->header.header.payloadSize > entry->bufferSize) {
- handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize);
- if (entry->buffer)
- free(entry->buffer);
- entry->buffer = malloc((size_t) handle->bufferSize);
+ // Alloc message buffers
+ if (entry->header.header.payloadSize > entry->bufferSize) {
+ handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize);
+ char *buffer = realloc(entry->buffer, (size_t) handle->bufferSize);
+ if (buffer) {
+ entry->buffer = buffer;
entry->bufferSize = handle->bufferSize;
}
- if (entry->header.header.metadataSize > entry->metaBufferSize) {
- if (entry->metaBuffer) {
- free(entry->metaBuffer);
- }
- entry->metaBuffer = malloc((size_t) entry->header.header.metadataSize);
- entry->metaBufferSize = entry->header.header.metadataSize;
- L_WARN("[TCP Socket] socket: %d, url: %s, realloc read meta buffer: (%d, %d) \n", entry->fd,
- entry->url, entry->metaBufferSize, entry->header.header.metadataSize);
- }
+ }
- if (entry->header.header.payloadSize) {
- unsigned int offset = entry->header.header.payloadOffset;
- unsigned int size = entry->header.header.payloadPartSize;
- // For header less messages adjust offset and msg size;
- if (!entry->headerBufferSize) {
- offset = entry->headerSize;
- size -= offset;
- }
- // Read payload data from queue
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, offset, size, 0);
- if (nbytes > 0) {
- if (nbytes == size) {
- entry->bufferReadSize += nbytes;
- } else {
- entry->bufferReadSize = 0;
- L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
- }
- }
- }
- if (nbytes > 0 && entry->header.header.metadataSize) {
- // Read meta data from queue
- unsigned int size = entry->header.header.metadataSize;
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0, size,0);
- if ((nbytes > 0) && (nbytes != size)) {
- L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
- }
- }
- // Check for end of message using, footer of message. Because of streaming protocol
- if (nbytes > 0) {
- if (entry->footerSize > 0) {
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer,0, entry->footerSize,0);
- if (handle->protocol->decodeFooter(handle->protocol->handle, entry->footerBuffer, entry->footerSize, &entry->header) == CELIX_SUCCESS) {
- // valid footer, this means that the message is valid
- validMsg = true;
- } else {
- // Did not receive correct header
- L_ERROR("[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url);
- entry->bufferReadSize = 0;
- }
- } else {
- // No Footer, then complete message is received
- validMsg = true;
+#ifdef USE_TCP_PUB_SUB_BUFFER_MEMSET
+ // For Debugging
+ if ((entry->header.header.payloadOffset == 0 ) && (msgSize == entry->headerSize)) {
+ memset(entry->buffer, 0x00, entry->bufferSize);
+ }
+#endif
+ entry->headerError = false;
+ if (entry->readHeaderBufferSize) {
+ msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+ msg.msg_iovlen++;
+ }
+ if (entry->header.header.payloadPartSize) {
+ char* buffer = entry->buffer;
+ msg.msg_iov[msg.msg_iovlen].iov_base = &buffer[entry->header.header.payloadOffset];
+ msg.msg_iov[msg.msg_iovlen].iov_len = entry->header.header.payloadPartSize;
+ msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+ msg.msg_iovlen++;
+ }
+ if (entry->header.header.metadataSize) {
+ if (entry->header.header.metadataSize > entry->readMetaBufferSize) {
+ char *buffer = realloc(entry->readMetaBuffer, (size_t) entry->header.header.metadataSize);
+ if (buffer) {
+ entry->readMetaBuffer = buffer;
+ entry->readMetaBufferSize = entry->header.header.metadataSize;
+ L_WARN("[TCP Socket] socket: %d, url: %s, realloc read meta buffer: (%d, %d) \n", entry->fd,
+ entry->url, entry->readMetaBufferSize, entry->header.header.metadataSize);
}
}
+ msg.msg_iov[msg.msg_iovlen].iov_base = entry->readMetaBuffer;
+ msg.msg_iov[msg.msg_iovlen].iov_len = entry->header.header.metadataSize;
+ msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+ msg.msg_iovlen++;
}
+ if (entry->readFooterSize) {
+ msg.msg_iov[msg.msg_iovlen].iov_base = entry->readFooterBuffer;
+ msg.msg_iov[msg.msg_iovlen].iov_len = entry->readFooterSize;
+ msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+ msg.msg_iovlen++;
+ }
+ }
+ // If there is data in buffer, read the data. using blocking read
+ nbytes = recvmsg(fd, &msg, MSG_NOSIGNAL | MSG_WAITALL);
+ if (nbytes < msgSize) {
+ L_ERROR("[TCP Socket] socket: %d, url: %s, read nbytes %d != msgSize %d\n", entry->fd, entry->url, (int)nbytes, (int)msgSize);
}
}
- if (nbytes > 0) {
- entry->retryCount = 0;
- // Check if complete message is received
- if ((entry->bufferReadSize >= entry->header.header.payloadSize) && validMsg) {
- entry->bufferReadSize = 0;
+ if ((nbytes >= msgSize)&&(!entry->headerError)) {
+ bool valid = true;
+ if (entry->readFooterSize) {
+ if (handle->protocol->decodeFooter(handle->protocol->handle,
+ entry->readFooterBuffer,
+ entry->readFooterSize,
+ &entry->header) != CELIX_SUCCESS) {
+
+ // Did not receive correct footer
+ L_ERROR(
+ "[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)",
+ entry->header.header.seqNr,
+ entry->fd,
+ entry->url);
+ valid = false;
+ }
+ }
+ if (!entry->header.header.isLastSegment) {
+ // Not last Segment of message
+ valid = false;
+ }
+
+ if (valid) {
+ // Complete message is received
pubsub_tcpHandler_decodePayload(handle, entry);
}
- } else {
- if (entry->retryCount < handle->maxRcvRetryCount) {
+ }
+
+ if (nbytes > 0) {
+ entry->retryCount = 0;
+ } else if (nbytes < 0) {
+ if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
+ // Non blocking interrupt
+ entry->retryCount = 0;
+ } else if (entry->retryCount < handle->maxRcvRetryCount) {
entry->retryCount++;
- L_WARN("[TCP Socket] Failed to receive message (fd: %d), error: %s. Retry count %u of %u,", entry->fd,
- strerror(errno), entry->retryCount, handle->maxRcvRetryCount);
+ L_WARN(
+ "[TCP Socket] Failed to receive message (fd: %d), try again. error(%d): %s, Retry count %u of %u.",
+ entry->fd, errno, strerror(errno), entry->retryCount, handle->maxSendRetryCount);
} else {
L_ERROR("[TCP Socket] Failed to receive message (fd: %d) after %u retries! Closing connection... Error: %s",
entry->fd, handle->maxRcvRetryCount, strerror(errno));
nbytes = 0; //Return 0 as indicator to close the connection
}
+ } else {
+ L_WARN("[TCP Socket] No message received (fd: %d), error(%d): %s, nbytes : %d", entry->fd, errno, strerror(errno), (int)nbytes);
}
+ celixThreadMutex_unlock(&entry->readMutex);
celixThreadRwlock_unlock(&handle->dbLock);
- return nbytes;
+ return (int)nbytes;
}
-
int pubsub_tcpHandler_addMessageHandler(pubsub_tcpHandler_t *handle, void *payload,
pubsub_tcpHandler_processMessage_callback_t processMessageCallback) {
int result = 0;
@@ -964,58 +976,23 @@ int pubsub_tcpHandler_addAcceptConnectionCallback(pubsub_tcpHandler_t *handle, v
return result;
}
-static inline
-int pubsub_tcpHandler_writeSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, struct msghdr* msg, unsigned int size, int flag ) {
- int nbytes = 0;
- int msgSize = 0;
- if (entry->fd >= 0 && size && msg->msg_iovlen) {
- int expectedReadSize = size;
- unsigned int offset = 0;
- nbytes = size;
- while (nbytes > 0 && expectedReadSize > 0) {
- // Read the message header
- nbytes = sendmsg(entry->fd, msg, flag | MSG_NOSIGNAL);
- // Update admin
- expectedReadSize -= nbytes;
- msgSize += nbytes;
- // Not all written
- if (expectedReadSize && nbytes > 0) {
- unsigned int readSize = 0;
- unsigned int readIndex = 0;
- unsigned int i = 0;
- for (i = 0; i < msg->msg_iovlen; i++) {
- if (nbytes < msg->msg_iov[i].iov_len) {
- readIndex = i;
- break;
- }
- readSize+= msg->msg_iov[i].iov_len;
- }
- msg->msg_iov = &msg->msg_iov[readIndex];
- msg->msg_iovlen -= readIndex;
- char* buffer = (char*)msg->msg_iov->iov_base;
- offset = nbytes - readSize;
- msg->msg_iov->iov_base = &buffer[offset];
- msg->msg_iov->iov_len = msg->msg_iov->iov_len - offset;
- }
- }
- }
- if (nbytes <=0) msgSize = nbytes;
- return msgSize;
-}
+
//
// Write large data to TCP. .
//
int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message_t *message, struct iovec *msgIoVec,
size_t msg_iov_len, int flags) {
- celixThreadRwlock_readLock(&handle->dbLock);
int result = 0;
int connFdCloseQueue[hashMap_size(handle->connection_fd_map)];
int nofConnToClose = 0;
if (handle) {
+ celixThreadRwlock_readLock(&handle->dbLock);
hash_map_iterator_t iter = hashMapIterator_construct(handle->connection_fd_map);
+ size_t max_msg_iov_len = IOV_MAX - 2; // header , footer, padding
while (hashMapIterator_hasNext(&iter)) {
psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
if (!entry->connected) continue;
+ celixThreadMutex_lock(&entry->writeMutex);
void *payloadData = NULL;
size_t payloadSize = 0;
if (msg_iov_len == 1) {
@@ -1025,6 +1002,9 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
payloadSize += msgIoVec[i].iov_len;
}
}
+ // When maxMsgSize is zero then payloadSize is disabled
+ if (!entry->maxMsgSize) payloadSize = 0;
+
message->header.convertEndianess = 0;
message->header.payloadSize = payloadSize;
message->header.payloadPartSize = payloadSize;
@@ -1034,130 +1014,174 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
void *metadataData = NULL;
size_t metadataSize = 0;
if (message->metadata.metadata) {
- metadataData = entry->metaBuffer;
+ metadataSize = entry->writeMetaBufferSize;
+ metadataData = entry->writeMetaBuffer;
handle->protocol->encodeMetadata(handle->protocol->handle, message,
&metadataData,
&metadataSize);
- entry->metaBufferSize = metadataSize;
}
+ // When maxMsgSize is smaller then meta data is disabled
+ if (!entry->maxMsgSize || (metadataSize > entry->maxMsgSize)) metadataSize = 0;
+
message->header.metadataSize = metadataSize;
+ size_t totalMessageSize = payloadSize + metadataSize;
+
+ bool isMessageSegmentationSupported = false;
+ handle->protocol->isMessageSegmentationSupported(handle->protocol->handle, &isMessageSegmentationSupported);
+ if (((!isMessageSegmentationSupported) && (msg_iov_len > max_msg_iov_len)) ||
+ ((!isMessageSegmentationSupported) && (totalMessageSize > entry->maxMsgSize))) {
+ L_WARN("[TCP Socket] Failed to send message (fd: %d), Message segmentation is not supported\n",
+ entry->fd);
+ celixThreadMutex_unlock(&entry->writeMutex);
+ continue;
+ }
void *footerData = NULL;
size_t footerDataSize = 0;
- if (entry->footerSize) {
- footerData = entry->footerBuffer;
+ if (entry->writeFooterSize) {
+ footerDataSize = entry->writeFooterSize;
+ footerData = entry->writeFooterBuffer;
handle->protocol->encodeFooter(handle->protocol->handle, message,
&footerData,
&footerDataSize);
- entry->footerSize = footerDataSize;
+ entry->writeFooterSize = MAX(footerDataSize, entry->writeFooterSize);
+ if (footerData && entry->writeFooterBuffer != footerData) entry->writeFooterBuffer = footerData;
}
size_t msgSize = 0;
- struct msghdr msg;
- struct iovec msg_iov[IOV_MAX];
- memset(&msg, 0x00, sizeof(struct msghdr));
- msg.msg_name = &entry->addr;
- msg.msg_namelen = entry->len;
- msg.msg_flags = flags;
- msg.msg_iov = msg_iov;
+ size_t msgPayloadSize = 0;
+ size_t msgMetaDataSize = 0;
+ size_t msgIovLen = 0;
+ long int nbytes = UINT32_MAX;
+ while (msgSize < totalMessageSize && nbytes > 0) {
+ struct msghdr msg;
+ struct iovec msg_iov[IOV_MAX];
+ memset(&msg, 0x00, sizeof(struct msghdr));
+ msg.msg_name = &entry->addr;
+ msg.msg_namelen = entry->len;
+ msg.msg_flags = flags;
+ msg.msg_iov = msg_iov;
+ size_t msgPartSize = 0;
+ message->header.payloadPartSize = 0;
+ message->header.payloadOffset = 0;
+ message->header.metadataSize = 0;
+ message->header.isLastSegment = 0;
+
+ // Write generic seralized payload in vector buffer
+ if (msgPayloadSize < payloadSize) {
+ if (payloadSize && payloadData && entry->maxMsgSize) {
+ char *buffer = payloadData;
+ msg.msg_iovlen++;
+ msg.msg_iov[msg.msg_iovlen].iov_base = &buffer[msgPayloadSize];
+ msg.msg_iov[msg.msg_iovlen].iov_len = MIN((payloadSize - msgPayloadSize), entry->maxMsgSize);
+ msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+ message->header.payloadPartSize = msgPartSize;
+ message->header.payloadOffset = msgPayloadSize;
+ msgPayloadSize += message->header.payloadPartSize;
+ msgSize = msgPayloadSize;
+ } else {
+ // copy serialized vector into vector buffer
+ for (size_t i = 0; i < MIN(msg_iov_len, max_msg_iov_len); i++) {
+ msg.msg_iovlen++;
+ msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[msgIovLen + i].iov_base;
+ msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[msgIovLen + i].iov_len;
+ if ((msgPartSize + msg.msg_iov[msg.msg_iovlen].iov_len) > entry->maxMsgSize) {
+ msg.msg_iovlen--;
+ break;
+ }
+ msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+ }
+ msgIovLen += msg.msg_iovlen;
+ message->header.payloadOffset = msgPayloadSize;
+ message->header.payloadPartSize = msgPartSize;
+ msgPayloadSize += message->header.payloadPartSize;
+ msgSize = msgPayloadSize;
+ }
+ }
- // Write generic seralized payload in vector buffer
- if (payloadSize && payloadData) {
- msg.msg_iovlen++;
- msg.msg_iov[msg.msg_iovlen].iov_base = payloadData;
- msg.msg_iov[msg.msg_iovlen].iov_len = payloadSize;
- msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
- } else {
- // copy serialized vector into vector buffer
- for (size_t i = 0; i < MIN(msg_iov_len, IOV_MAX - 2); i++) {
+ // Write optional metadata in vector buffer
+ if ((msgPayloadSize >= payloadSize) &&
+ (msgMetaDataSize < metadataSize) &&
+ (msgPartSize < entry->maxMsgSize) &&
+ (msg.msg_iovlen+1 < max_msg_iov_len-1) &&
+ (metadataSize && metadataData)) {
msg.msg_iovlen++;
- msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[i].iov_base;
- msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[i].iov_len;
- msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+ msg.msg_iov[msg.msg_iovlen].iov_base = metadataData;
+ msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize;
+ msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+ message->header.metadataSize = metadataSize;
+ msgSize += metadataSize;
+ }
+ if (msgSize >= totalMessageSize) {
+ message->header.isLastSegment = 0x1;
}
- }
-
- // Write optional metadata in vector buffer
- if (metadataSize && metadataData) {
- msg.msg_iovlen++;
- msg.msg_iov[msg.msg_iovlen].iov_base = metadataData;
- msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize;
- msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
- }
- // Write optional footerData in vector buffer
- if (footerData && footerDataSize) {
- msg.msg_iovlen++;
- msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
- msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize;
- msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
- }
+ // Write optional footerData in vector buffer
+ if (footerData && footerDataSize) {
+ msg.msg_iovlen++;
+ msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
+ msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize;
+ msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+ }
- void *headerData = NULL;
- size_t headerSize = 0;
- // check if header is not part of the payload (=> headerBufferSize = 0)s
- if (entry->headerBufferSize) {
- headerData = entry->headerBuffer;
- // Encode the header, with payload size and metadata size
- handle->protocol->encodeHeader(handle->protocol->handle, message,
- &headerData,
- &headerSize);
- entry->headerBufferSize = headerSize;
- }
- if (!entry->headerBufferSize) {
- // Skip header buffer, when header is part of payload;
- msg.msg_iov = &msg_iov[1];
- } else if (headerSize && headerData) {
- // Write header in 1st vector buffer item
- msg.msg_iov[0].iov_base = headerData;
- msg.msg_iov[0].iov_len = headerSize;
- msgSize += msg.msg_iov[0].iov_len;
- msg.msg_iovlen++;
- } else {
- L_ERROR("[TCP Socket] No header buffer is generated");
- msg.msg_iovlen = 0;
- }
- long int nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgSize, flags);
- // When a specific socket keeps reporting errors can indicate a subscriber
- // which is not active anymore, the connection will remain until the retry
- // counter exceeds the maximum retry count.
- // Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
- if (nbytes == -1) {
- if (entry->retryCount < handle->maxSendRetryCount) {
- entry->retryCount++;
- L_ERROR(
- "[TCP Socket] Failed to send message (fd: %d), error: %s. try again. Retry count %u of %u, ",
- entry->fd, strerror(errno), entry->retryCount, handle->maxSendRetryCount);
+ void *headerData = NULL;
+ size_t headerSize = entry->writeHeaderBufferSize;
+ // check if header is not part of the payload (=> headerBufferSize = 0)s
+ if (entry->writeHeaderBufferSize) {
+ headerSize = entry->writeHeaderBufferSize;
+ headerData = entry->writeHeaderBuffer;
+ // Encode the header, with payload size and metadata size
+ handle->protocol->encodeHeader(handle->protocol->handle, message,
+ &headerData,
+ &headerSize);
+ entry->writeHeaderBufferSize = MAX(headerSize, entry->writeHeaderBufferSize);
+ if (headerData && entry->writeHeaderBuffer != headerData) entry->writeHeaderBuffer = headerData;
+ }
+ if (!entry->writeHeaderBufferSize) {
+ // Skip header buffer, when header is part of payload;
+ msg.msg_iov = &msg_iov[1];
+ } else if (headerSize && headerData) {
+ // Write header in 1st vector buffer item
+ msg.msg_iov[0].iov_base = headerData;
+ msg.msg_iov[0].iov_len = headerSize;
+ msgPartSize += msg.msg_iov[0].iov_len;
+ msg.msg_iovlen++;
} else {
- L_ERROR(
- "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s",
- entry->fd, handle->maxSendRetryCount, strerror(errno));
- connFdCloseQueue[nofConnToClose++] = entry->fd;
+ L_ERROR("[TCP Socket] No header buffer is generated");
+ msg.msg_iovlen = 0;
}
- result = -1; //At least one connection failed sending
- } else if (msgSize) {
- entry->retryCount = 0;
- if (nbytes != msgSize) {
- L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgSize, nbytes, strerror(errno));
+ nbytes = sendmsg(entry->fd, &msg, flags | MSG_NOSIGNAL);
+ // When a specific socket keeps reporting errors can indicate a subscriber
+ // which is not active anymore, the connection will remain until the retry
+ // counter exceeds the maximum retry count.
+ // Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
+ if (nbytes == -1) {
+ if (entry->retryCount < handle->maxSendRetryCount) {
+ entry->retryCount++;
+ L_ERROR(
+ "[TCP Socket] Failed to send message (fd: %d), try again. Retry count %u of %u, error(%d): %s.",
+ entry->fd, entry->retryCount, handle->maxSendRetryCount, errno, strerror(errno));
+ } else {
+ L_ERROR(
+ "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s", entry->fd, handle->maxSendRetryCount, strerror(errno));
+ connFdCloseQueue[nofConnToClose++] = entry->fd;
+ }
+ result = -1; //At least one connection failed sending
+ } else if (msgPartSize) {
+ entry->retryCount = 0;
+ if (nbytes != msgPartSize) {
+ L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgSize, nbytes, strerror(errno));
+ }
+ }
+ // Note: serialized Payload is deleted by serializer
+ if (payloadData && (payloadData != message->payload.payload)) {
+ free(payloadData);
}
}
- // Release data
- if (headerData && headerData != entry->headerBuffer) {
- free(headerData);
- }
- // Note: serialized Payload is deleted by serializer
- if (payloadData && (payloadData != message->payload.payload)) {
- free(payloadData);
- }
- if (metadataData && metadataData != entry->metaBuffer) {
- free(metadataData);
- }
- if (footerData && footerData != entry->footerBuffer) {
- free(footerData);
- }
+ celixThreadMutex_unlock(&entry->writeMutex);
}
+ celixThreadRwlock_unlock(&handle->dbLock);
}
- celixThreadRwlock_unlock(&handle->dbLock);
//Force close all connections that are queued in a list, done outside of locking handle->dbLock to prevent deadlock
for (int i = 0; i < nofConnToClose; i++) {
pubsub_tcpHandler_close(handle, connFdCloseQueue[i]);
@@ -1169,12 +1193,12 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
// get interface URL
//
char *pubsub_tcpHandler_get_interface_url(pubsub_tcpHandler_t *handle) {
- hash_map_iterator_t interface_iter =
+ hash_map_iterator_t iter =
hashMapIterator_construct(handle->interface_url_map);
char *url = NULL;
- while (hashMapIterator_hasNext(&interface_iter)) {
+ while (hashMapIterator_hasNext(&iter)) {
psa_tcp_connection_entry_t *entry =
- hashMapIterator_nextValue(&interface_iter);
+ hashMapIterator_nextValue(&iter);
if (entry && entry->url) {
if (!url) {
url = celix_utils_strdup(entry->url);
@@ -1187,6 +1211,32 @@ char *pubsub_tcpHandler_get_interface_url(pubsub_tcpHandler_t *handle) {
}
return url;
}
+//
+// get interface URL
+//
+char *pubsub_tcpHandler_get_connection_url(pubsub_tcpHandler_t *handle) {
+ hash_map_iterator_t iter =
+ hashMapIterator_construct(handle->connection_url_map);
+ char *url = NULL;
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_tcp_connection_entry_t *entry =
+ hashMapIterator_nextValue(&iter);
+ if (entry && entry->url) {
+ if (!url) {
+ pubsub_utils_url_t *url_info = pubsub_utils_url_parse(entry->url);
+ url = celix_utils_strdup(url_info->interface_url ? url_info->interface_url : entry->url);
+ pubsub_utils_url_free(url_info);
+ } else {
+ char *tmp = url;
+ pubsub_utils_url_t *url_info = pubsub_utils_url_parse(entry->url);
+ asprintf(&url, "%s %s", tmp, url_info->interface_url ? url_info->interface_url : entry->url);
+ pubsub_utils_url_free(url_info);
+ free(tmp);
+ }
+ }
+ }
+ return url;
+}
//
// Handle non-blocking accept (sender)
@@ -1216,7 +1266,8 @@ int pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t *handle, psa_tcp_connect
#else
struct epoll_event event;
bzero(&event, sizeof(event)); // zero the struct
- event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR;
+ event.events = EPOLLRDHUP | EPOLLERR;
+ if (handle->enableReceiveEvent) event.events |= EPOLLIN;
event.data.fd = entry->fd;
// Register Read to epoll
rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
index ed4581c..2d97634 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
@@ -58,15 +58,14 @@ int pubsub_tcpHandler_close(pubsub_tcpHandler_t *handle, int fd);
int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url);
int pubsub_tcpHandler_disconnect(pubsub_tcpHandler_t *handle, char *url);
int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url);
-
-int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle,
- unsigned int maxNofBuffers,
- unsigned int bufferSize);
+int pubsub_tcpHandler_setReceiveBufferSize(pubsub_tcpHandler_t *handle, unsigned int size);
+int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int size);
void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle, unsigned int timeout);
void pubsub_tcpHandler_setSendRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count);
void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count);
void pubsub_tcpHandler_setSendTimeOut(pubsub_tcpHandler_t *handle, double timeout);
void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double timeout);
+void pubsub_tcpHandler_enableReceiveEvent(pubsub_tcpHandler_t *handle, bool enable);
int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd);
int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle,
@@ -86,6 +85,7 @@ int pubsub_tcpHandler_addAcceptConnectionCallback(pubsub_tcpHandler_t *handle,
pubsub_tcpHandler_acceptConnectMessage_callback_t connectMessageCallback,
pubsub_tcpHandler_acceptConnectMessage_callback_t disconnectMessageCallback);
char *pubsub_tcpHandler_get_interface_url(pubsub_tcpHandler_t *handle);
+char *pubsub_tcpHandler_get_connection_url(pubsub_tcpHandler_t *handle);
void pubsub_tcpHandler_setThreadPriority(pubsub_tcpHandler_t *handle, long prio, const char *sched);
void pubsub_tcpHandler_setThreadName(pubsub_tcpHandler_t *handle, const char *topic, const char *scope);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index 4fa4586..985af6b 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -24,20 +24,17 @@
#include <pubsub/subscriber.h>
#include <memory.h>
#include <pubsub_constants.h>
-#include <assert.h>
-#include <pubsub_endpoint.h>
#include <arpa/inet.h>
#include <celix_log_helper.h>
-#include <math.h>
#include "pubsub_tcp_handler.h"
#include "pubsub_tcp_topic_receiver.h"
#include "pubsub_psa_tcp_constants.h"
#include "pubsub_tcp_common.h"
-#include "celix_utils_api.h"
#include <uuid/uuid.h>
#include <pubsub_admin_metrics.h>
#include <pubsub_utils.h>
+#include "pubsub_interceptors_handler.h"
#include <celix_api.h>
#define MAX_EPOLL_EVENTS 16
@@ -65,8 +62,10 @@ struct pubsub_tcp_topic_receiver {
char *topic;
size_t timeout;
bool metricsEnabled;
+ bool isPassive;
pubsub_tcpHandler_t *socketHandler;
pubsub_tcpHandler_t *sharedSocketHandler;
+ pubsub_interceptors_handler_t *interceptorsHandler;
struct {
celix_thread_t thread;
@@ -118,24 +117,14 @@ typedef struct psa_tcp_subscriber_entry {
bool initialized; //true if the init function is called through the receive thread
} psa_tcp_subscriber_entry_t;
-static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props,
- const celix_bundle_t *owner);
-
-static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props,
- const celix_bundle_t *owner);
-
+static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
+static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
static void *psa_tcp_recvThread(void *data);
-
static void psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t *receiver);
-
static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiver);
-
static void processMsg(void *handle, const pubsub_protocol_message_t *hdr, bool *release, struct timespec *receiveTime);
-
static void psa_tcp_connectHandler(void *handle, const char *url, bool lock);
-
static void psa_tcp_disConnectHandler(void *handle, const char *url, bool lock);
-
static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor);
pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context_t *ctx,
@@ -143,7 +132,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
const char *scope,
const char *topic,
const celix_properties_t *topicProperties,
- pubsub_tcp_endPointStore_t *endPointStore,
+ pubsub_tcp_endPointStore_t *handlerStore,
long serializerSvcId,
pubsub_serializer_service_t *serializer,
long protocolSvcId,
@@ -157,52 +146,43 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
receiver->protocol = protocol;
receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
receiver->topic = strndup(topic, 1024 * 1024);
- bool isServerEndPoint = false;
-
- /* Check if it's a static endpoint */
- const char *staticClientEndPointUrls = NULL;
- const char *staticServerEndPointUrls = NULL;
- const char *staticConnectUrls = NULL;
-
- staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_CONNECT_URLS_FOR, topic, scope);
+ pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler);
+ const char *staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_CONNECT_URLS_FOR, topic, scope);
+ const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope);
+ const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
if (topicProperties != NULL) {
if(staticConnectUrls == NULL) {
staticConnectUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL);
}
- const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
- if (endPointType != NULL) {
- if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType,
- strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) {
- staticClientEndPointUrls = staticConnectUrls;
- }
- if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType,
- strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) == 0) {
- staticServerEndPointUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_BIND_URL, NULL);
- isServerEndPoint = true;
- }
+ if (isPassive == NULL) {
+ isPassive = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_CONFIGURED, NULL);
+ }
+ if (passiveKey == NULL) {
+ passiveKey = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_KEY, NULL);
}
}
+ receiver->isPassive = psa_tcp_isPassive(isPassive);
+
// Set receiver connection thread timeout.
// property is in ms, timeout value in us. (convert ms to us).
receiver->timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT,
PSA_TCP_SUBSCRIBER_CONNECTION_DEFAULT_TIMEOUT) * 1000;
/* When it's an endpoint share the socket with the sender */
- if ((staticClientEndPointUrls != NULL) || (staticServerEndPointUrls)) {
- celixThreadMutex_lock(&endPointStore->mutex);
- const char *endPointUrl = (staticServerEndPointUrls) ? staticServerEndPointUrls : staticClientEndPointUrls;
- pubsub_tcpHandler_t *entry = hashMap_get(endPointStore->map, endPointUrl);
+ if (passiveKey != NULL) {
+ celixThreadMutex_lock(&handlerStore->mutex);
+ pubsub_tcpHandler_t *entry = hashMap_get(handlerStore->map, passiveKey);
if (entry == NULL) {
if (receiver->socketHandler == NULL)
receiver->socketHandler = pubsub_tcpHandler_create(receiver->protocol, receiver->logHelper);
entry = receiver->socketHandler;
receiver->sharedSocketHandler = receiver->socketHandler;
- hashMap_put(endPointStore->map, (void *) endPointUrl, entry);
+ hashMap_put(handlerStore->map, (void *) passiveKey, entry);
} else {
receiver->socketHandler = entry;
receiver->sharedSocketHandler = entry;
}
- celixThreadMutex_unlock(&endPointStore->mutex);
+ celixThreadMutex_unlock(&handlerStore->mutex);
} else {
receiver->socketHandler = pubsub_tcpHandler_create(receiver->protocol, receiver->logHelper);
}
@@ -213,14 +193,12 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_KEY,
PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_DEFAULT);
double rcvTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT);
- long sessions = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_MAX_RECV_SESSIONS,
- PSA_TCP_DEFAULT_MAX_RECV_SESSIONS);
- long buffer_size = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE,
+ long bufferSize = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE,
PSA_TCP_DEFAULT_RECV_BUFFER_SIZE);
long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT);
+
pubsub_tcpHandler_setThreadName(receiver->socketHandler, topic, scope);
- pubsub_tcpHandler_createReceiveBufferStore(receiver->socketHandler, (unsigned int) sessions,
- (unsigned int) buffer_size);
+ pubsub_tcpHandler_setReceiveBufferSize(receiver->socketHandler, (unsigned int) bufferSize);
pubsub_tcpHandler_setTimeout(receiver->socketHandler, (unsigned int) timeout);
pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, receiver, processMsg);
pubsub_tcpHandler_addReceiverConnectionCallback(receiver->socketHandler, receiver, psa_tcp_connectHandler,
@@ -230,8 +208,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
pubsub_tcpHandler_setReceiveTimeOut(receiver->socketHandler, rcvTimeout);
}
receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
- PSA_TCP_DEFAULT_METRICS_ENABLED);
-
+ PSA_TCP_DEFAULT_METRICS_ENABLED);
celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
celixThreadMutex_create(&receiver->thread.mutex, NULL);
@@ -239,7 +216,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
- if ((staticConnectUrls != NULL) && (receiver->socketHandler != NULL) && (staticServerEndPointUrls == NULL)) {
+ if ((staticConnectUrls != NULL) && (receiver->socketHandler != NULL) && (!receiver->isPassive)) {
char *urlsCopy = strndup(staticConnectUrls, 1024 * 1024);
char *url;
char *save = urlsCopy;
@@ -255,7 +232,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
free(urlsCopy);
}
- if (receiver->socketHandler != NULL && (!isServerEndPoint)) {
+ if (receiver->socketHandler != NULL && (!receiver->isPassive)) {
// Configure Receiver thread
receiver->thread.running = true;
celixThread_create(&receiver->thread.thread, NULL, psa_tcp_recvThread, receiver);
@@ -346,7 +323,7 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
pubsub_tcpHandler_destroy(receiver->socketHandler);
receiver->socketHandler = NULL;
}
-
+ pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler);
if (receiver->scope != NULL) {
free(receiver->scope);
}
@@ -374,6 +351,17 @@ long pubsub_tcpTopicReceiver_protocolSvcId(pubsub_tcp_topic_receiver_t *receiver
void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiver, celix_array_list_t *connectedUrls,
celix_array_list_t *unconnectedUrls) {
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+ if (receiver->isPassive) {
+ char* interface_url = pubsub_tcpHandler_get_interface_url(receiver->socketHandler);
+ char *url = NULL;
+ asprintf(&url, "%s (passive)", interface_url ? interface_url : "");
+ if (interface_url) {
+ celix_arrayList_add(connectedUrls, url);
+ } else {
+ celix_arrayList_add(unconnectedUrls, url);
+ }
+ free(interface_url);
+ } else {
hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
while (hashMapIterator_hasNext(&iter)) {
psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
@@ -383,11 +371,16 @@ void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiv
celix_arrayList_add(connectedUrls, url);
} else {
celix_arrayList_add(unconnectedUrls, url);
+ }
}
}
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
}
+bool pubsub_tcpTopicReceiver_isPassive(pubsub_tcp_topic_receiver_t *receiver) {
+ return receiver->isPassive;
+}
+
void pubsub_tcpTopicReceiver_connectTo(
pubsub_tcp_topic_receiver_t *receiver,
const char *url) {
@@ -466,8 +459,7 @@ static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const
hashMap_put(entry->subscriberServices, (void*)svcId, svc);
- int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t *) bnd,
- &entry->msgTypes);
+ int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t *) bnd, &entry->msgTypes);
if (rc == 0) {
entry->metrics = hashMap_create(NULL, NULL, NULL, NULL);
@@ -498,7 +490,6 @@ static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, co
long bndId = celix_bundle_getId(bnd);
long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
-
celixThreadMutex_lock(&receiver->subscribers.mutex);
psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId);
if (entry != NULL) {
@@ -559,31 +550,39 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
}
if (status == CELIX_SUCCESS) {
- hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+ const char *msgType = msgSer->msgName;
+ uint32_t msgId = message->header.msgId;
+ celix_properties_t *metadata = message->metadata.metadata;
+ bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, &metadata);
bool release = true;
- while (hashMapIterator_hasNext(&iter)) {
- pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
- svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata,
- &release);
- if (!release && hashMapIterator_hasNext(&iter)) {
- //receive function has taken ownership and still more receive function to come ..
- //deserialize again for new message
- status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 1, &deSerializedMsg);
- if (status != CELIX_SUCCESS) {
- L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName,
- receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
- break;
+ if (cont) {
+ hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
+ svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata, &release);
+ pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata);
+ if (!release && hashMapIterator_hasNext(&iter)) {
+ //receive function has taken ownership and still more receive function to come ..
+ //deserialize again for new message
+ status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 1, &deSerializedMsg);
+ if (status != CELIX_SUCCESS) {
+ L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s",
+ msgSer->msgName,
+ receiver->scope == NULL ? "(null)" : receiver->scope,
+ receiver->topic);
+ break;
+ }
+ release = true;
}
- release = true;
}
+ if (release) {
+ msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
+ }
+ if (message->metadata.metadata) {
+ celix_properties_destroy(message->metadata.metadata);
+ }
+ updateReceiveCount += 1;
}
- if (release) {
- msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
- }
- if (message->metadata.metadata) {
- celix_properties_destroy(message->metadata.metadata);
- }
- updateReceiveCount += 1;
} else {
updateSerError += 1;
L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName,
@@ -650,10 +649,7 @@ static void *psa_tcp_recvThread(void *data) {
pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topic_receiver_t *receiver) {
pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
- snprintf(result->scope,
- PUBSUB_AMDIN_METRICS_NAME_MAX,
- "%s",
- receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope);
+ snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope);
snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic);
int msgTypesCount = 0;
@@ -677,8 +673,7 @@ pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topi
hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
while (hashMapIterator_hasNext(&iter2)) {
hash_map_t *origins = hashMapIterator_nextValue(&iter2);
- result->msgTypes[i].origins = calloc((size_t) hashMap_size(origins),
- sizeof(*(result->msgTypes[i].origins)));
+ result->msgTypes[i].origins = calloc((size_t) hashMap_size(origins), sizeof(*(result->msgTypes[i].origins)));
result->msgTypes[i].nrOfOrigins = hashMap_size(origins);
int k = 0;
hash_map_iterator_t iter3 = hashMapIterator_construct(origins);
@@ -694,10 +689,8 @@ pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topi
result->msgTypes[i].origins[k].averageDelayInSeconds = metrics->averageDelayInSeconds;
result->msgTypes[i].origins[k].maxDelayInSeconds = metrics->maxDelayInSeconds;
result->msgTypes[i].origins[k].minDelayInSeconds = metrics->minDelayInSeconds;
- result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds =
- metrics->averageTimeBetweenMessagesInSeconds;
- result->msgTypes[i].origins[k].averageSerializationTimeInSeconds =
- metrics->averageSerializationTimeInSeconds;
+ result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds = metrics->averageTimeBetweenMessagesInSeconds;
+ result->msgTypes[i].origins[k].averageSerializationTimeInSeconds = metrics->averageSerializationTimeInSeconds;
result->msgTypes[i].origins[k].lastMessageReceived = metrics->lastMessageReceived;
result->msgTypes[i].origins[k].nrOfMissingSeqNumbers = metrics->nrOfMissingSeqNumbers;
@@ -707,7 +700,7 @@ pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topi
metrics->msgTypeId);
}
}
- i += 1;
+ i +=1 ;
}
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -721,7 +714,7 @@ static void psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t
hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
while (hashMapIterator_hasNext(&iter)) {
psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
- if ((entry) && (!entry->connected)) {
+ if ((entry) && (!entry->connected) && (!receiver->isPassive)) {
int rc = pubsub_tcpHandler_connect(entry->parent->socketHandler, entry->url);
if (rc < 0) {
allConnected = false;
@@ -812,12 +805,11 @@ static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t
int versionMajor;
int versionMinor;
- if (msgVersion != NULL) {
+ if (msgVersion!=NULL) {
version_getMajor(msgVersion, &versionMajor);
version_getMinor(msgVersion, &versionMinor);
- if (major == ((unsigned char) versionMajor)) { /* Different major means incompatible */
- check = (minor >=
- ((unsigned char) versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
+ if (major==((unsigned char)versionMajor)) { /* Different major means incompatible */
+ check = (minor>=((unsigned char)versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
}
}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h
index 50d5a97..118bf11 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h
@@ -32,7 +32,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
const char *scope,
const char *topic,
const celix_properties_t *topicProperties,
- pubsub_tcp_endPointStore_t *endPointStore,
+ pubsub_tcp_endPointStore_t *handlerStore,
long serializerSvcId,
pubsub_serializer_service_t *serializer,
long protocolSvcId,
@@ -47,6 +47,7 @@ long pubsub_tcpTopicReceiver_protocolSvcId(pubsub_tcp_topic_receiver_t *receiver
void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiver,
celix_array_list_t *connectedUrls,
celix_array_list_t *unconnectedUrls);
+bool pubsub_tcpTopicReceiver_isPassive(pubsub_tcp_topic_receiver_t *sender);
void pubsub_tcpTopicReceiver_connectTo(pubsub_tcp_topic_receiver_t *receiver, const char *url);
void pubsub_tcpTopicReceiver_disconnectFrom(pubsub_tcp_topic_receiver_t *receiver, const char *url);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index dbb5e26..105dcf1 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -35,8 +35,8 @@
#include "celix_constants.h"
#include <signal.h>
#include <pubsub_utils.h>
+#include "pubsub_interceptors_handler.h"
-#define FIRST_SEND_DELAY_IN_SECONDS 2
#define TCP_BIND_MAX_RETRY 10
#define L_DEBUG(...) \
@@ -59,13 +59,15 @@ struct pubsub_tcp_topic_sender {
bool metricsEnabled;
pubsub_tcpHandler_t *socketHandler;
pubsub_tcpHandler_t *sharedSocketHandler;
+ pubsub_interceptors_handler_t *interceptorsHandler;
char *scope;
char *topic;
char *url;
bool isStatic;
-
+ bool isPassive;
bool verbose;
+ unsigned long send_delay;
struct {
long svcId;
@@ -128,7 +130,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
const char *scope,
const char *topic,
const celix_properties_t *topicProperties,
- pubsub_tcp_endPointStore_t *endPointStore,
+ pubsub_tcp_endPointStore_t *handlerStore,
long serializerSvcId,
pubsub_serializer_service_t *ser,
long protocolSvcId,
@@ -144,52 +146,43 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
if (uuid != NULL) {
uuid_parse(uuid, sender->fwUUID);
}
- sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
- PSA_TCP_DEFAULT_METRICS_ENABLED);
- bool isEndpoint = false;
+ pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+ sender->isPassive = false;
+ sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED);
char *urls = NULL;
const char *ip = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_IP_KEY, NULL);
- const char *discUrl = NULL;
- const char *staticClientEndPointUrls = NULL;
- const char *staticServerEndPointUrls = NULL;
-
- discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_BIND_URL_FOR, topic, scope);
+ const char *discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_BIND_URL_FOR, topic, scope);
+ const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope);
+ const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
if (topicProperties != NULL) {
if (discUrl == NULL) {
discUrl = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_DISCOVER_URL, NULL);
}
- /* Check if it's a static endpoint */
- const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
- if (endPointType != NULL) {
- isEndpoint = true;
- if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType,
- strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) {
- staticClientEndPointUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL);
- }
- if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType,
- strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) == 0) {
- staticServerEndPointUrls = discUrl;
- }
+ if (isPassive == NULL) {
+ isPassive = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_CONFIGURED, NULL);
+ }
+ if (passiveKey == NULL) {
+ passiveKey = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_KEY, NULL);
}
}
+ sender->isPassive = psa_tcp_isPassive(isPassive);
/* When it's an endpoint share the socket with the receiver */
- if ((staticClientEndPointUrls != NULL) || (staticServerEndPointUrls)) {
- celixThreadMutex_lock(&endPointStore->mutex);
- const char *endPointUrl = (staticClientEndPointUrls) ? staticClientEndPointUrls : staticServerEndPointUrls;
- pubsub_tcpHandler_t *entry = hashMap_get(endPointStore->map, endPointUrl);
+ if (passiveKey != NULL) {
+ celixThreadMutex_lock(&handlerStore->mutex);
+ pubsub_tcpHandler_t *entry = hashMap_get(handlerStore->map, passiveKey);
if (entry == NULL) {
if (sender->socketHandler == NULL)
sender->socketHandler = pubsub_tcpHandler_create(sender->protocol, sender->logHelper);
entry = sender->socketHandler;
sender->sharedSocketHandler = sender->socketHandler;
- hashMap_put(endPointStore->map, (void *) endPointUrl, entry);
+ hashMap_put(handlerStore->map, (void *) passiveKey, entry);
} else {
sender->socketHandler = entry;
sender->sharedSocketHandler = entry;
}
- celixThreadMutex_unlock(&endPointStore->mutex);
+ celixThreadMutex_unlock(&handlerStore->mutex);
} else {
sender->socketHandler = pubsub_tcpHandler_create(sender->protocol, sender->logHelper);
}
@@ -197,66 +190,64 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
if ((sender->socketHandler != NULL) && (topicProperties != NULL)) {
long prio = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_THREAD_REALTIME_PRIO, -1L);
const char *sched = celix_properties_get(topicProperties, PUBSUB_TCP_THREAD_REALTIME_SCHED, NULL);
- long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY,
- PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
- double timeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY,
- (!isEndpoint) ? PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT :
- PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT);
+ long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY, PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
+ double sendTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY, PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT);
+ long maxMsgSize = celix_properties_getAsLong(topicProperties, PSA_TCP_MAX_MESSAGE_SIZE, PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE);
+ long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT);
+ sender->send_delay = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_SEND_DELAY, PSA_TCP_DEFAULT_SEND_DELAY);
pubsub_tcpHandler_setThreadName(sender->socketHandler, topic, scope);
pubsub_tcpHandler_setThreadPriority(sender->socketHandler, prio, sched);
pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);
- pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, timeout);
+ pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, sendTimeout);
+ pubsub_tcpHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) maxMsgSize);
+ pubsub_tcpHandler_enableReceiveEvent(sender->socketHandler, (passiveKey) ? true : false);
+ pubsub_tcpHandler_setTimeout(sender->socketHandler, (unsigned int) timeout);
}
- //setting up tcp socket for TCP TopicSender
- if (staticClientEndPointUrls != NULL) {
- // Store url for client static endpoint
- sender->url = strndup(staticClientEndPointUrls, 1024 * 1024);
- sender->isStatic = true;
- } else if (discUrl != NULL) {
- urls = strndup(discUrl, 1024 * 1024);
- sender->isStatic = true;
- } else if (ip != NULL) {
- urls = strndup(ip, 1024 * 1024);
- } else {
- struct sockaddr_in *sin = pubsub_utils_url_getInAddr(NULL, 0);
- urls = pubsub_utils_url_get_url(sin, NULL);
- free(sin);
- }
- if (!sender->url) {
- char *urlsCopy = strndup(urls, 1024 * 1024);
- char *url;
- char *save = urlsCopy;
- while ((url = strtok_r(save, " ", &save))) {
- int retry = 0;
- while (url && retry < TCP_BIND_MAX_RETRY) {
- pubsub_utils_url_t *urlInfo = pubsub_utils_url_parse(url);
- int rc = pubsub_tcpHandler_listen(sender->socketHandler, urlInfo->url);
- if (rc < 0) {
- L_WARN("Error for tcp_bind using dynamic bind url '%s'. %s", urlInfo->url, strerror(errno));
- } else {
- url = NULL;
+ if (!sender->isPassive) {
+ //setting up tcp socket for TCP TopicSender
+ if (discUrl != NULL) {
+ urls = strndup(discUrl, 1024 * 1024);
+ sender->isStatic = true;
+ } else if (ip != NULL) {
+ urls = strndup(ip, 1024 * 1024);
+ } else {
+ struct sockaddr_in *sin = pubsub_utils_url_getInAddr(NULL, 0);
+ urls = pubsub_utils_url_get_url(sin, NULL);
+ free(sin);
+ }
+ if (!sender->url) {
+ char *urlsCopy = strndup(urls, 1024 * 1024);
+ char *url;
+ char *save = urlsCopy;
+ while ((url = strtok_r(save, " ", &save))) {
+ int retry = 0;
+ while (url && retry < TCP_BIND_MAX_RETRY) {
+ pubsub_utils_url_t *urlInfo = pubsub_utils_url_parse(url);
+ int rc = pubsub_tcpHandler_listen(sender->socketHandler, urlInfo->url);
+ if (rc < 0) {
+ L_WARN("Error for tcp_bind using dynamic bind url '%s'. %s", urlInfo->url, strerror(errno));
+ } else {
+ url = NULL;
+ }
+ pubsub_utils_url_free(urlInfo);
+ retry++;
}
- pubsub_utils_url_free(urlInfo);
- retry++;
}
+ free(urlsCopy);
+ sender->url = pubsub_tcpHandler_get_interface_url(sender->socketHandler);
}
- free(urlsCopy);
- sender->url = pubsub_tcpHandler_get_interface_url(sender->socketHandler);
- }
- if (urls)
free(urls);
+ }
- if (sender->url != NULL) {
+ //register publisher services using a service factory
+ if ((sender->url != NULL) || (sender->isPassive)) {
sender->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
sender->topic = strndup(topic, 1024 * 1024);
celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
- }
- //register publisher services using a service factory
- if (sender->url != NULL) {
sender->publisher.factory.handle = sender;
sender->publisher.factory.getService = psa_tcp_getPublisherService;
sender->publisher.factory.ungetService = psa_tcp_ungetPublisherService;
@@ -274,9 +265,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
opts.properties = props;
sender->publisher.svcId = celix_bundleContext_registerServiceWithOptions(ctx, &opts);
- }
-
- if (sender->url == NULL) {
+ } else {
free(sender);
sender = NULL;
}
@@ -312,6 +301,7 @@ void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) {
celixThreadMutex_unlock(&sender->boundedServices.mutex);
celixThreadMutex_destroy(&sender->boundedServices.mutex);
+ pubsubInterceptorsHandler_destroy(sender->interceptorsHandler);
if ((sender->socketHandler) && (sender->sharedSocketHandler == NULL)) {
pubsub_tcpHandler_destroy(sender->socketHandler);
sender->socketHandler = NULL;
@@ -343,13 +333,20 @@ const char *pubsub_tcpTopicSender_topic(pubsub_tcp_topic_sender_t *sender) {
}
const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender) {
- return sender->url;
+ if (sender->isPassive) {
+ return pubsub_tcpHandler_get_connection_url(sender->socketHandler);
+ } else {
+ return sender->url;
+ }
}
-
bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender) {
return sender->isStatic;
}
+bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender) {
+ return sender->isPassive;
+}
+
void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint) {
//TODO subscriber count -> topic info
}
@@ -483,8 +480,7 @@ pubsub_admin_sender_metrics_t *pubsub_tcpTopicSender_metrics(pubsub_tcp_topic_se
result->msgMetrics[i].nrOfMessagesSendFailed = mEntry->metrics.nrOfMessagesSendFailed;
result->msgMetrics[i].nrOfSerializationErrors = mEntry->metrics.nrOfSerializationErrors;
result->msgMetrics[i].averageSerializationTimeInSeconds = mEntry->metrics.averageSerializationTimeInSeconds;
- result->msgMetrics[i].averageTimeBetweenMessagesInSeconds =
- mEntry->metrics.averageTimeBetweenMessagesInSeconds;
+ result->msgMetrics[i].averageTimeBetweenMessagesInSeconds = mEntry->metrics.averageTimeBetweenMessagesInSeconds;
result->msgMetrics[i].lastMessageSend = mEntry->metrics.lastMessageSend;
result->msgMetrics[i].bndId = entry->bndId;
result->msgMetrics[i].typeId = mEntry->type;
@@ -533,7 +529,8 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
clock_gettime(CLOCK_REALTIME, &serializationEnd);
}
- if (status == CELIX_SUCCESS /*ser ok*/) {
+ bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, &metadata);
+ if (status == CELIX_SUCCESS /*ser ok*/ && cont) {
pubsub_protocol_message_t message;
message.metadata.metadata = NULL;
message.payload.payload = NULL;
@@ -555,12 +552,12 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
entry->seqNr++;
bool sendOk = true;
{
- int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput,
- serializedIoVecOutputLen, 0);
+ int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput, serializedIoVecOutputLen, 0);
if (rc < 0) {
status = -1;
sendOk = false;
}
+ pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, metadata);
if (message.metadata.metadata)
celix_properties_destroy(message.metadata.metadata);
if (serializedIoVecOutput) {
@@ -617,8 +614,8 @@ static void delay_first_send_for_late_joiners(pubsub_tcp_topic_sender_t *sender)
static bool firstSend = true;
if (firstSend) {
- L_INFO("PSA_TCP_TP: Delaying first send for late joiners...\n");
- sleep(FIRST_SEND_DELAY_IN_SECONDS);
+ if (sender->send_delay ) L_INFO("PSA_TCP_TP: Delaying first send for late joiners...\n");
+ usleep(sender->send_delay * 1000);
firstSend = false;
}
-}
+}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h
index 2217989..16438c4 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h
@@ -33,28 +33,21 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
const char *scope,
const char *topic,
const celix_properties_t *topicProperties,
- pubsub_tcp_endPointStore_t *endPointStore,
+ pubsub_tcp_endPointStore_t *handlerStore,
long serializerSvcId,
pubsub_serializer_service_t *ser,
long protocolSvcId,
pubsub_protocol_service_t *prot);
void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender);
-
const char *pubsub_tcpTopicSender_scope(pubsub_tcp_topic_sender_t *sender);
-
const char *pubsub_tcpTopicSender_topic(pubsub_tcp_topic_sender_t *sender);
-
const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender);
-
bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender);
-
+bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender);
long pubsub_tcpTopicSender_serializerSvcId(pubsub_tcp_topic_sender_t *sender);
-
long pubsub_tcpTopicSender_protocolSvcId(pubsub_tcp_topic_sender_t *sender);
-
void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint);
-
void pubsub_tcpTopicSender_disconnectFrom(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint);
/**
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 4b4ad90..bbbcabf 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -39,7 +39,7 @@
#include "pubsub_admin.h"
#include "../../pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h"
-#define PSTM_PSA_HANDLING_DEFAULT_SLEEPTIME_IN_SECONDS 30L
+#define PSTM_PSA_HANDLING_DEFAULT_SLEEPTIME 250 // 250 msecond
#ifndef UUID_STR_LEN
#define UUID_STR_LEN 37
@@ -79,7 +79,7 @@ celix_status_t pubsub_topologyManager_create(celix_bundle_context_t *context, ce
manager->loghelper = logHelper;
manager->verbose = celix_bundleContext_getPropertyAsBool(context, PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE);
- manager->handlingThreadSleepTime = celix_bundleContext_getPropertyAsLong(context, PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS_KEY, PSTM_PSA_HANDLING_DEFAULT_SLEEPTIME_IN_SECONDS);
+ manager->handlingThreadSleepTime = celix_bundleContext_getPropertyAsLong(context, PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_KEY, PSTM_PSA_HANDLING_DEFAULT_SLEEPTIME);
manager->psaHandling.running = true;
celixThread_create(&manager->psaHandling.thread, NULL, pstm_psaHandlingThread, manager);
@@ -1108,7 +1108,7 @@ static void *pstm_psaHandlingThread(void *data) {
pstm_findPsaForEndpoints(manager); //trying to find psa and possible set for endpoints with no psa
celixThreadMutex_lock(&manager->psaHandling.mutex);
- celixThreadCondition_timedwaitRelative(&manager->psaHandling.cond, &manager->psaHandling.mutex, manager->handlingThreadSleepTime, 0L);
+ celixThreadCondition_timedwaitRelative(&manager->psaHandling.cond, &manager->psaHandling.mutex, manager->handlingThreadSleepTime / 1000, (manager->handlingThreadSleepTime % 1000) * 1000000);
running = manager->psaHandling.running;
celixThreadMutex_unlock(&manager->psaHandling.mutex);
}
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index 842d861..4223433 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -31,7 +31,7 @@
#include "pubsub/subscriber.h"
#define PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY "PUBSUB_TOPOLOGY_MANAGER_VERBOSE"
-#define PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS_KEY "PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS"
+#define PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_KEY "PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME"
#define PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE false
diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h b/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h
index 87d4263..b10863c 100644
--- a/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h
+++ b/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h
@@ -28,16 +28,16 @@ typedef struct pubsub_utils_url {
char *url;
char *protocol;
char *hostname;
- unsigned int portnr;
+ unsigned int port_nr;
char *uri;
char *interface;
- unsigned int interface_portnr;
+ unsigned int interface_port_nr;
char *interface_url;
} pubsub_utils_url_t;
struct sockaddr_in *pubsub_utils_url_from_fd(int fd);
-struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, int port);
-char *pubsub_utils_url_generate_url(char *hostname, unsigned int portnr, char *protocol);
+struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, unsigned int port);
+char *pubsub_utils_url_generate_url(char *hostname, unsigned int port_nr, char *protocol);
char *pubsub_utils_url_get_url(struct sockaddr_in *inp, char *protocol);
bool pubsub_utils_url_is_multicast(char *hostname);
char *pubsub_utils_url_get_multicast_ip(char *hostname);
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c b/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c
index d8d518c..65a1ff2 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c
@@ -56,7 +56,7 @@ struct sockaddr_in *pubsub_utils_url_from_fd(int fd) {
return inp;
}
-struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, int port) {
+struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, unsigned int port) {
struct hostent *hp;
struct sockaddr_in *inp = malloc(sizeof(struct sockaddr_in));
bzero(inp, sizeof(struct sockaddr_in)); // zero the struct
@@ -220,11 +220,11 @@ void pubsub_utils_url_parse_url(char *_url, pubsub_utils_url_t *url_info) {
maxPortnr += 1;
unsigned int minDigits = (unsigned int) atoi(portnr);
unsigned int maxDigits = (unsigned int) atoi(maxPortnr);
- url_info->portnr = pubsub_utils_url_rand_range(minDigits, maxDigits);
+ url_info->port_nr = pubsub_utils_url_rand_range(minDigits, maxDigits);
} else {
unsigned int portDigits = (unsigned int) atoi(portnr);
if (portDigits != 0)
- url_info->portnr = portDigits;
+ url_info->port_nr = portDigits;
uri = strstr(port, "/");
if ((uri) && (!url_info->uri))
url_info->uri = celix_utils_strdup(uri);
@@ -256,11 +256,11 @@ void pubsub_utils_url_parse_url(char *_url, pubsub_utils_url_t *url_info) {
maxPortnr += 1;
unsigned int minDigits = (unsigned int) atoi(portnr);
unsigned int maxDigits = (unsigned int) atoi(maxPortnr);
- url_info->interface_portnr = pubsub_utils_url_rand_range(minDigits, maxDigits);
+ url_info->interface_port_nr = pubsub_utils_url_rand_range(minDigits, maxDigits);
} else {
unsigned int portDigits = (unsigned int) atoi(portnr);
if (portDigits != 0)
- url_info->interface_portnr = portDigits;
+ url_info->interface_port_nr = portDigits;
uri = strstr(port, "/");
if ((uri) && (!url_info->uri))
url_info->uri = celix_utils_strdup(uri);
@@ -289,13 +289,13 @@ pubsub_utils_url_t *pubsub_utils_url_parse(char *url) {
free(url_info->interface);
url_info->interface = ip;
}
- struct sockaddr_in *m_sin = pubsub_utils_url_getInAddr(url_info->interface, url_info->interface_portnr);
+ struct sockaddr_in *m_sin = pubsub_utils_url_getInAddr(url_info->interface, url_info->interface_port_nr);
url_info->interface_url = pubsub_utils_url_get_url(m_sin, NULL);
free(m_sin);
pubsub_utils_url_parse_url(url_info->interface_url, &interface_url_info);
free(url_info->interface);
url_info->interface = interface_url_info.hostname;
- url_info->interface_portnr = interface_url_info.portnr;
+ url_info->interface_port_nr = interface_url_info.port_nr;
}
if (url_info->hostname) {
@@ -306,11 +306,11 @@ pubsub_utils_url_t *pubsub_utils_url_parse(char *url) {
free(url_info->hostname);
url_info->hostname = ip;
}
- struct sockaddr_in *sin = pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr);
+ struct sockaddr_in *sin = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr);
url_info->url = pubsub_utils_url_get_url(sin, url_info->protocol);
free(url_info->hostname);
free(sin);
- url_info->portnr = 0;
+ url_info->port_nr = 0;
url_info->hostname = NULL;
pubsub_utils_url_parse_url(url_info->url, url_info);
}
@@ -338,7 +338,7 @@ void pubsub_utils_url_free(pubsub_utils_url_t *url_info) {
url_info->hostname = NULL;
url_info->protocol = NULL;
url_info->interface = NULL;
- url_info->portnr = 0;
- url_info->interface_portnr = 0;
+ url_info->port_nr = 0;
+ url_info->interface_port_nr = 0;
free(url_info);
}
diff --git a/bundles/pubsub/test/CMakeLists.txt b/bundles/pubsub/test/CMakeLists.txt
index a78958a..5c936ce 100644
--- a/bundles/pubsub/test/CMakeLists.txt
+++ b/bundles/pubsub/test/CMakeLists.txt
@@ -38,7 +38,7 @@ celix_bundle_files(pubsub_endpoint_sut
add_celix_bundle(pubsub_endpoint_tst
#Test bundle containing cpputests and uses celix_test_runner launcher instead of the celix launcher
SOURCES
- test/tst_activator.c
+ test/tst_endpoint_activator.c
VERSION 1.0.0
)
target_link_libraries(pubsub_endpoint_tst PRIVATE Celix::framework Celix::pubsub_api)
@@ -47,7 +47,7 @@ celix_bundle_files(pubsub_endpoint_tst
DESTINATION "META-INF/descriptors"
)
celix_bundle_files(pubsub_endpoint_tst
- meta_data/ping2.properties
+ meta_data/ping3.properties
DESTINATION "META-INF/topics/sub"
)
@@ -65,7 +65,7 @@ celix_bundle_files(pubsub_loopback
DESTINATION "META-INF/descriptors"
)
celix_bundle_files(pubsub_loopback
- meta_data/pong2.properties
+ meta_data/pong3.properties
DESTINATION "META-INF/topics/pub"
)
celix_bundle_files(pubsub_loopback
@@ -153,9 +153,9 @@ if (BUILD_PUBSUB_PSA_UDP_MC)
LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
BUNDLES
Celix::pubsub_serializer_json
+ Celix::pubsub_protocol_wire_v2
Celix::pubsub_topology_manager
Celix::pubsub_admin_zmq
- Celix::pubsub_protocol_wire_v2
Celix::shell
Celix::shell_tui
)
@@ -189,9 +189,9 @@ if (BUILD_PUBSUB_PSA_TCP)
Celix::shell
Celix::shell_tui
Celix::pubsub_serializer_json
+ Celix::pubsub_protocol_wire_v2
Celix::pubsub_topology_manager
Celix::pubsub_admin_tcp
- Celix::pubsub_protocol_wire_v2
pubsub_sut
pubsub_tst
)
@@ -211,12 +211,12 @@ if (BUILD_PUBSUB_PSA_TCP)
Celix::shell
Celix::shell_tui
Celix::pubsub_serializer_json
+ Celix::pubsub_protocol_wire_v2
Celix::pubsub_topology_manager
Celix::pubsub_admin_tcp
- Celix::pubsub_protocol_wire_v1
pubsub_loopback
- pubsub_endpoint_sut
pubsub_endpoint_tst
+ pubsub_endpoint_sut
)
target_link_libraries(pubsub_tcp_endpoint_tests PRIVATE Celix::pubsub_api ${CppUTest_LIBRARIES} Jansson Celix::dfi)
target_include_directories(pubsub_tcp_endpoint_tests SYSTEM PRIVATE ${CppUTest_INCLUDE_DIR} test)
@@ -229,9 +229,9 @@ if (BUILD_PUBSUB_PSA_TCP)
LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
BUNDLES
Celix::pubsub_serializer_json
+ Celix::pubsub_protocol_wire_v2
Celix::pubsub_topology_manager
Celix::pubsub_admin_zmq
- Celix::pubsub_protocol_wire_v2
Celix::shell
Celix::shell_tui
)
@@ -286,9 +286,9 @@ if (BUILD_PUBSUB_PSA_WS)
LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
BUNDLES
Celix::pubsub_serializer_json
+ Celix::pubsub_protocol_wire_v2
Celix::pubsub_topology_manager
Celix::pubsub_admin_zmq
- Celix::pubsub_protocol_wire_v2
Celix::shell
Celix::shell_tui
)
@@ -319,9 +319,9 @@ if (BUILD_PUBSUB_PSA_ZMQ)
LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
BUNDLES
Celix::pubsub_serializer_json
+ Celix::pubsub_protocol_wire_v1
Celix::pubsub_topology_manager
Celix::pubsub_admin_zmq
- Celix::pubsub_protocol_wire_v1
pubsub_sut
pubsub_tst
)
@@ -339,9 +339,9 @@ if (BUILD_PUBSUB_PSA_ZMQ)
LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
BUNDLES
Celix::pubsub_serializer_json
+ Celix::pubsub_protocol_wire_v2
Celix::pubsub_topology_manager
Celix::pubsub_admin_zmq
- Celix::pubsub_protocol_wire_v2
pubsub_sut
pubsub_tst
)
@@ -361,9 +361,9 @@ if (BUILD_PUBSUB_PSA_ZMQ)
PSA_ZMQ_ZEROCOPY_ENABLED=true
BUNDLES
Celix::pubsub_serializer_json
+ Celix::pubsub_protocol_wire_v1
Celix::pubsub_topology_manager
Celix::pubsub_admin_zmq
- Celix::pubsub_protocol_wire_v1
Celix::shell
Celix::shell_tui
pubsub_sut
@@ -384,9 +384,9 @@ if (BUILD_PUBSUB_PSA_ZMQ)
PSA_ZMQ_ZEROCOPY_ENABLED=true
BUNDLES
Celix::pubsub_serializer_json
+ Celix::pubsub_protocol_wire_v2
Celix::pubsub_topology_manager
Celix::pubsub_admin_zmq
- Celix::pubsub_protocol_wire_v2
Celix::shell
Celix::shell_tui
pubsub_sut
@@ -406,9 +406,9 @@ if (BUILD_PUBSUB_PSA_ZMQ)
LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
BUNDLES
Celix::pubsub_serializer_json
+ Celix::pubsub_protocol_wire_v2
Celix::pubsub_topology_manager
Celix::pubsub_admin_zmq
- Celix::pubsub_protocol_wire_v2
Celix::shell
Celix::shell_tui
)
diff --git a/bundles/pubsub/test/meta_data/ping2.properties b/bundles/pubsub/test/meta_data/ping2.properties
index 4b42836..ff0dbed 100644
--- a/bundles/pubsub/test/meta_data/ping2.properties
+++ b/bundles/pubsub/test/meta_data/ping2.properties
@@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
tcp.static.bind.url=tcp://localhost:9500
-tcp.static.endpoint.type=server
+tcp.passive.key=tcp://localhost:9500
#note only effective if run as root
thread.realtime.sched=SCHED_FIFO
diff --git a/bundles/pubsub/test/meta_data/pong2.properties b/bundles/pubsub/test/meta_data/pong2.properties
index fa55718..b95f3bc 100644
--- a/bundles/pubsub/test/meta_data/pong2.properties
+++ b/bundles/pubsub/test/meta_data/pong2.properties
@@ -14,8 +14,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-tcp.static.connect.urls=tcp://localhost:9500;localhost:9501
-tcp.static.endpoint.type=client
+tcp.static.connect.urls=tcp://localhost:9500
+tcp.passive.key=tcp://localhost
#note only effective if run as root
thread.realtime.sched=SCHED_FIFO
diff --git a/bundles/pubsub/test/test/loopback_activator.c b/bundles/pubsub/test/test/loopback_activator.c
index 43d8a78..c208743 100644
--- a/bundles/pubsub/test/test/loopback_activator.c
+++ b/bundles/pubsub/test/test/loopback_activator.c
@@ -42,7 +42,7 @@ struct activator {
celix_status_t bnd_start(struct activator *act, celix_bundle_context_t *ctx) {
char filter[512];
- snprintf(filter, 512, "(%s=%s)", PUBSUB_PUBLISHER_TOPIC, "pong2");
+ snprintf(filter, 512, "(%s=%s)", PUBSUB_PUBLISHER_TOPIC, "pong3");
celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
opts.set = sut_pubSet;
opts.callbackHandle = act;
@@ -86,6 +86,7 @@ static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId
msg_t *msg = voidMsg;
msg_t send_msg = *msg;
pthread_mutex_lock(&act->mutex);
+
if (act->pubSvc != NULL) {
if (act->count == 0) {
act->pubSvc->localMsgTypeIdForMsgType(act->pubSvc->handle, MSG_NAME, &act->msgId);
diff --git a/bundles/pubsub/test/test/tst_endpoint_activator.c b/bundles/pubsub/test/test/tst_endpoint_activator.c
index 636b9a6..81e0326 100644
--- a/bundles/pubsub/test/test/tst_endpoint_activator.c
+++ b/bundles/pubsub/test/test/tst_endpoint_activator.c
@@ -48,7 +48,7 @@ celix_status_t bnd_start(struct activator *act, celix_bundle_context_t *ctx) {
{
celix_properties_t *props = celix_properties_create();
- celix_properties_set(props, PUBSUB_SUBSCRIBER_TOPIC, "ping2");
+ celix_properties_set(props, PUBSUB_SUBSCRIBER_TOPIC, "ping3");
act->subSvc.handle = act;
act->subSvc.receive = tst_receive;
act->subSvcId = celix_bundleContext_registerService(ctx, &act->subSvc, PUBSUB_SUBSCRIBER_SERVICE_NAME, props);