You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rb...@apache.org on 2020/08/29 08:54:37 UTC
[celix] 02/03: Fix tests
This is an automated email from the ASF dual-hosted git repository.
rbulter pushed a commit to branch feature/tcp_admin_msg_segmentation
in repository https://gitbox.apache.org/repos/asf/celix.git
commit 954d64d02f167413ad8e68d1b9e2874afd084f53
Author: Roy Bulter <ro...@gmail.com>
AuthorDate: Sat Aug 29 09:06:33 2020 +0200
Fix tests
---
CMakeLists.txt | 20 +++----
.../pubsub/pubsub_admin_tcp/src/psa_activator.c | 4 +-
.../src/pubsub_psa_tcp_constants.h | 3 ++
.../pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c | 30 ++++++-----
.../pubsub_admin_tcp/src/pubsub_tcp_handler.c | 29 ++++++++--
.../pubsub_admin_tcp/src/pubsub_tcp_handler.h | 1 +
.../src/pubsub_tcp_topic_receiver.c | 61 ++++++----------------
.../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c | 6 +--
bundles/pubsub/test/CMakeLists.txt | 4 +-
bundles/pubsub/test/test/sut_endpoint_activator.c | 2 -
10 files changed, 79 insertions(+), 81 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index a298fdd..8b7a406 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -32,6 +32,16 @@ IF (${CMAKE_MAJOR_VERSION}.${CMAKE_MINOR_VERSION} EQUAL 3.3 AND ${CMAKE_GENERATO
message( FATAL_ERROR "Building Celix using CMake 3.3 and makefiles is not supported due to a bug in the Makefile Generator (see Bug 15696). Please change the used CMake version - both, CMake 3.2 and CMake 3.4 are working fine. Or use a different generator (e.g. Ninja)." )
ENDIF()
+# Options
+option(ENABLE_TESTING "Enables unit/bundle testing" TRUE)
+if (ENABLE_TESTING)
+ find_package(GTest CONFIG QUIET)
+ if (NOT GTest_FOUND)
+ include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/AddGTest.cmake)
+ endif()
+ enable_testing()
+endif ()
+
set(ENABLE_MORE_WARNINGS OFF)
# Set C specific flags
@@ -113,16 +123,6 @@ set(CELIX_MICRO "1")
# Default bundle version
set(DEFAULT_VERSION 1.0.0)
-# Options
-option(ENABLE_TESTING "Enables unit/bundle testing" FALSE)
-if (ENABLE_TESTING)
- find_package(GTest CONFIG QUIET)
- if (NOT GTest_FOUND)
- include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/AddGTest.cmake)
- endif()
- enable_testing()
-endif()
-
option(CELIX_INSTALL_DEPRECATED_API "whether to install (and use) deprecated apis (i.e. header without a celix_ prefix." ON)
option(CELIX_ADD_DEPRECATED_ATTRIBUTES "If enabled add deprecated attributes to deprecated services/functions." ON)
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c b/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c
index a5ae576..d93c478 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c
@@ -121,9 +121,7 @@ int psa_tcp_start(psa_tcp_activator_t *act, celix_bundle_context_t *ctx) {
celix_properties_t *props = celix_properties_create();
celix_properties_set(props, CELIX_SHELL_COMMAND_NAME, "celix::psa_tcp");
celix_properties_set(props, CELIX_SHELL_COMMAND_USAGE, "psa_tcp");
- celix_properties_set(props,
- CELIX_SHELL_COMMAND_DESCRIPTION,
- "Print the information about the TopicSender and TopicReceivers for the TCP PSA");
+ celix_properties_set(props, CELIX_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the TCP PSA");
act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, CELIX_SHELL_COMMAND_SERVICE_NAME, props);
}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
index 9f03d13..dee36e6 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
@@ -65,6 +65,9 @@
#define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY "PUBSUB_TCP_SUBSCRIBER_RCV_TIMEOUT"
#define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT 5.0
+#define PUBSUB_TCP_SUBSCRIBER_BLOCKING_KEY "PUBSUB_TCP_SUBSCRIBER_BLOCKING"
+#define PUBSUB_TCP_SUBSCRIBER_BLOCKING_DEFAULT false
+
#define PUBSUB_TCP_PSA_IP_KEY "PSA_IP"
#define PUBSUB_TCP_ADMIN_TYPE "tcp"
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
index e7ad284..b1c1ea9 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
@@ -636,24 +636,13 @@ pubsub_tcpAdmin_disconnectEndpointFromReceiver(pubsub_tcp_admin_t *psa, pubsub_t
//note can be called with discoveredEndpoint.mutex lock
celix_status_t status = CELIX_SUCCESS;
- const char *scope = pubsub_tcpTopicReceiver_scope(receiver);
- const char *topic = pubsub_tcpTopicReceiver_topic(receiver);
-
- const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
- const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
const char *url = celix_properties_get(endpoint, PUBSUB_TCP_URL_KEY, NULL);
if (url == NULL) {
L_WARN("[PSA TCP] Error got endpoint without tcp url");
status = CELIX_BUNDLE_EXCEPTION;
} else {
- if (eTopic != NULL && topic != NULL && strncmp(eTopic, topic, 1024 * 1024) == 0) {
- if (scope == NULL && eScope == NULL) {
- pubsub_tcpTopicReceiver_disconnectFrom(receiver, url);
- } else if (scope != NULL && eScope != NULL && strncmp(eScope, scope, 1024 * 1024) == 0) {
- pubsub_tcpTopicReceiver_disconnectFrom(receiver, url);
- }
- }
+ pubsub_tcpTopicReceiver_disconnectFrom(receiver, url);
}
return status;
@@ -690,6 +679,23 @@ bool pubsub_tcpAdmin_executeCommand(void *handle, const char *commandLine __attr
pubsub_tcp_admin_t *psa = handle;
celix_status_t status = CELIX_SUCCESS;
+
+ char *line = celix_utils_strdup(commandLine);
+ char *token = line;
+ strtok_r(line, " ", &token); //first token is command name
+ strtok_r(NULL, " ", &token); //second token is sub command
+
+ if (celix_utils_stringEquals(token, "nr_of_receivers")) {
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ fprintf(out,"%i\n", hashMap_size(psa->topicReceivers.map));
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+ }
+ if (celix_utils_stringEquals(token, "nr_of_senders")) {
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ fprintf(out, "%i\n", hashMap_size(psa->topicSenders.map));
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+ }
+
fprintf(out, "\n");
fprintf(out, "Topic Senders:\n");
celixThreadMutex_lock(&psa->serializers.mutex);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index be5a694..daf36dc 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -97,6 +97,8 @@ typedef struct psa_tcp_connection_entry {
size_t writeMetaBufferSize;
void *writeMetaBuffer;
unsigned int retryCount;
+ celix_thread_mutex_t writeMutex;
+ celix_thread_mutex_t readMutex;
} psa_tcp_connection_entry_t;
//
@@ -129,6 +131,7 @@ struct pubsub_tcpHandler {
celix_thread_t thread;
bool running;
bool isEndPoint;
+ bool isBlocking;
};
static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock);
@@ -304,6 +307,8 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
if (fd >= 0) {
entry = calloc(sizeof(psa_tcp_connection_entry_t), 1);
entry->fd = fd;
+ celixThreadMutex_create(&entry->writeMutex, NULL);
+ celixThreadMutex_create(&entry->readMutex, NULL);
if (url)
entry->url = strndup(url, 1024 * 1024);
if (interface_url) {
@@ -359,6 +364,8 @@ pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry) {
if (entry->writeFooterBuffer) free(entry->writeFooterBuffer);
if (entry->readMetaBuffer) free(entry->readMetaBuffer);
if (entry->writeMetaBuffer) free(entry->writeMetaBuffer);
+ celixThreadMutex_destroy(&entry->writeMutex);
+ celixThreadMutex_destroy(&entry->readMutex);
free(entry);
}
}
@@ -421,7 +428,9 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
L_ERROR("[TCP Socket] Cannot create poll event %s\n", strerror(errno));
entry = NULL;
}
- rc = pubsub_tcpHandler_makeNonBlocking(handle, entry->fd);
+ if (!handle->isBlocking) {
+ rc = pubsub_tcpHandler_makeNonBlocking(handle, entry->fd);
+ }
if (rc < 0) {
pubsub_tcpHandler_freeEntry(entry);
L_ERROR("[TCP Socket] Cannot make not blocking %s\n", strerror(errno));
@@ -750,6 +759,14 @@ void pubsub_tcpHandler_setEndPoint(pubsub_tcpHandler_t *handle,bool isEndPoint)
}
}
+void pubsub_tcpHandler_setBlocking(pubsub_tcpHandler_t *handle,bool isBlocking) {
+ if (handle != NULL) {
+ celixThreadRwlock_writeLock(&handle->dbLock);
+ handle->isBlocking = isBlocking;
+ celixThreadRwlock_unlock(&handle->dbLock);
+ }
+}
+
static inline
void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry) {
@@ -774,7 +791,7 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec
// If the message is completely reassembled true is returned and the index and size have valid values
//
int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
- celixThreadRwlock_writeLock(&handle->dbLock);
+ celixThreadRwlock_readLock(&handle->dbLock);
psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) fd);
if (entry == NULL)
entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd);
@@ -788,7 +805,7 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
celixThreadRwlock_unlock(&handle->dbLock);
return -1;
}
-
+ celixThreadMutex_lock(&entry->readMutex);
if (entry->readHeaderBufferSize && entry->readHeaderBuffer) {
entry->readHeaderBuffer = malloc(entry->readHeaderBufferSize);
}
@@ -936,6 +953,7 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
nbytes = 0; //Return 0 as indicator to close the connection
}
}
+ celixThreadMutex_unlock(&entry->readMutex);
celixThreadRwlock_unlock(&handle->dbLock);
return (int)nbytes;
}
@@ -984,12 +1002,13 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
int connFdCloseQueue[hashMap_size(handle->connection_fd_map)];
int nofConnToClose = 0;
if (handle) {
- celixThreadRwlock_writeLock(&handle->dbLock);
+ celixThreadRwlock_readLock(&handle->dbLock);
hash_map_iterator_t iter = hashMapIterator_construct(handle->connection_fd_map);
size_t max_msg_iov_len = IOV_MAX - 2; // header , footer, padding
while (hashMapIterator_hasNext(&iter)) {
psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
if (!entry->connected) continue;
+ celixThreadMutex_lock(&entry->writeMutex);
void *payloadData = NULL;
size_t payloadSize = 0;
if (msg_iov_len == 1) {
@@ -1029,6 +1048,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
((!isMessageSegmentationSupported) && (totalMessageSize > entry->maxMsgSize))) {
L_WARN("[TCP Socket] Failed to send message (fd: %d), Message segmentation is not supported\n",
entry->fd);
+ celixThreadMutex_unlock(&entry->writeMutex);
continue;
}
@@ -1174,6 +1194,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
free(payloadData);
}
}
+ celixThreadMutex_unlock(&entry->writeMutex);
}
celixThreadRwlock_unlock(&handle->dbLock);
}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
index a08911c..70eb29a 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
@@ -66,6 +66,7 @@ void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, unsigned
void pubsub_tcpHandler_setSendTimeOut(pubsub_tcpHandler_t *handle, double timeout);
void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double timeout);
void pubsub_tcpHandler_setEndPoint(pubsub_tcpHandler_t *handle, bool isEndPoint);
+void pubsub_tcpHandler_setBlocking(pubsub_tcpHandler_t *handle, bool isBlocking);
int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd);
int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle,
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index 533e773..ccfbb57 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -24,17 +24,13 @@
#include <pubsub/subscriber.h>
#include <memory.h>
#include <pubsub_constants.h>
-#include <assert.h>
-#include <pubsub_endpoint.h>
#include <arpa/inet.h>
#include <celix_log_helper.h>
-#include <math.h>
#include "pubsub_tcp_handler.h"
#include "pubsub_tcp_topic_receiver.h"
#include "pubsub_psa_tcp_constants.h"
#include "pubsub_tcp_common.h"
-#include "celix_utils_api.h"
#include <uuid/uuid.h>
#include <pubsub_admin_metrics.h>
#include <pubsub_utils.h>
@@ -120,24 +116,14 @@ typedef struct psa_tcp_subscriber_entry {
bool initialized; //true if the init function is called through the receive thread
} psa_tcp_subscriber_entry_t;
-static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props,
- const celix_bundle_t *owner);
-
-static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props,
- const celix_bundle_t *owner);
-
+static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
+static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
static void *psa_tcp_recvThread(void *data);
-
static void psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t *receiver);
-
static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiver);
-
static void processMsg(void *handle, const pubsub_protocol_message_t *hdr, bool *release, struct timespec *receiveTime);
-
static void psa_tcp_connectHandler(void *handle, const char *url, bool lock);
-
static void psa_tcp_disConnectHandler(void *handle, const char *url, bool lock);
-
static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor);
pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context_t *ctx,
@@ -219,6 +205,8 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
long bufferSize = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE,
PSA_TCP_DEFAULT_RECV_BUFFER_SIZE);
long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT);
+ bool blocking = celix_bundleContext_getPropertyAsBool(ctx, PUBSUB_TCP_SUBSCRIBER_BLOCKING_KEY, PUBSUB_TCP_SUBSCRIBER_BLOCKING_DEFAULT);
+
pubsub_tcpHandler_setThreadName(receiver->socketHandler, topic, scope);
pubsub_tcpHandler_setReceiveBufferSize(receiver->socketHandler, (unsigned int) bufferSize);
pubsub_tcpHandler_setTimeout(receiver->socketHandler, (unsigned int) timeout);
@@ -228,10 +216,10 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
pubsub_tcpHandler_setThreadPriority(receiver->socketHandler, prio, sched);
pubsub_tcpHandler_setReceiveRetryCnt(receiver->socketHandler, (unsigned int) retryCnt);
pubsub_tcpHandler_setReceiveTimeOut(receiver->socketHandler, rcvTimeout);
+ pubsub_tcpHandler_setBlocking(receiver->socketHandler, blocking);
}
receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
- PSA_TCP_DEFAULT_METRICS_ENABLED);
-
+ PSA_TCP_DEFAULT_METRICS_ENABLED);
celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
celixThreadMutex_create(&receiver->thread.mutex, NULL);
@@ -346,7 +334,7 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
pubsub_tcpHandler_destroy(receiver->socketHandler);
receiver->socketHandler = NULL;
}
-
+ pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler);
if (receiver->scope != NULL) {
free(receiver->scope);
}
@@ -371,8 +359,7 @@ long pubsub_tcpTopicReceiver_protocolSvcId(pubsub_tcp_topic_receiver_t *receiver
return receiver->protocolSvcId;
}
-void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiver, celix_array_list_t *connectedUrls,
- celix_array_list_t *unconnectedUrls) {
+void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiver, celix_array_list_t *connectedUrls, celix_array_list_t *unconnectedUrls) {
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
while (hashMapIterator_hasNext(&iter)) {
@@ -466,8 +453,7 @@ static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const
hashMap_put(entry->subscriberServices, (void*)svcId, svc);
- int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t *) bnd,
- &entry->msgTypes);
+ int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t *) bnd, &entry->msgTypes);
if (rc == 0) {
entry->metrics = hashMap_create(NULL, NULL, NULL, NULL);
@@ -498,7 +484,6 @@ static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, co
long bndId = celix_bundle_getId(bnd);
long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
-
celixThreadMutex_lock(&receiver->subscribers.mutex);
psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId);
if (entry != NULL) {
@@ -568,17 +553,8 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
while (hashMapIterator_hasNext(&iter)) {
pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
- svc->receive(svc->handle,
- msgSer->msgName,
- msgSer->msgId,
- deSerializedMsg,
- message->metadata.metadata,
- &release);
- pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler,
- msgType,
- msgId,
- deSerializedMsg,
- metadata);
+ svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata, &release);
+ pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata);
if (!release && hashMapIterator_hasNext(&iter)) {
//receive function has taken ownership and still more receive function to come ..
//deserialize again for new message
@@ -707,10 +683,8 @@ pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topi
result->msgTypes[i].origins[k].averageDelayInSeconds = metrics->averageDelayInSeconds;
result->msgTypes[i].origins[k].maxDelayInSeconds = metrics->maxDelayInSeconds;
result->msgTypes[i].origins[k].minDelayInSeconds = metrics->minDelayInSeconds;
- result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds =
- metrics->averageTimeBetweenMessagesInSeconds;
- result->msgTypes[i].origins[k].averageSerializationTimeInSeconds =
- metrics->averageSerializationTimeInSeconds;
+ result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds = metrics->averageTimeBetweenMessagesInSeconds;
+ result->msgTypes[i].origins[k].averageSerializationTimeInSeconds = metrics->averageSerializationTimeInSeconds;
result->msgTypes[i].origins[k].lastMessageReceived = metrics->lastMessageReceived;
result->msgTypes[i].origins[k].nrOfMissingSeqNumbers = metrics->nrOfMissingSeqNumbers;
@@ -720,7 +694,7 @@ pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topi
metrics->msgTypeId);
}
}
- i += 1;
+ i +=1 ;
}
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -825,12 +799,11 @@ static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t
int versionMajor;
int versionMinor;
- if (msgVersion != NULL) {
+ if (msgVersion!=NULL) {
version_getMajor(msgVersion, &versionMajor);
version_getMinor(msgVersion, &versionMinor);
- if (major == ((unsigned char) versionMajor)) { /* Different major means incompatible */
- check = (minor >=
- ((unsigned char) versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
+ if (major==((unsigned char)versionMajor)) { /* Different major means incompatible */
+ check = (minor>=((unsigned char)versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
}
}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index f7598f9..c49f642 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -490,8 +490,7 @@ pubsub_admin_sender_metrics_t *pubsub_tcpTopicSender_metrics(pubsub_tcp_topic_se
result->msgMetrics[i].nrOfMessagesSendFailed = mEntry->metrics.nrOfMessagesSendFailed;
result->msgMetrics[i].nrOfSerializationErrors = mEntry->metrics.nrOfSerializationErrors;
result->msgMetrics[i].averageSerializationTimeInSeconds = mEntry->metrics.averageSerializationTimeInSeconds;
- result->msgMetrics[i].averageTimeBetweenMessagesInSeconds =
- mEntry->metrics.averageTimeBetweenMessagesInSeconds;
+ result->msgMetrics[i].averageTimeBetweenMessagesInSeconds = mEntry->metrics.averageTimeBetweenMessagesInSeconds;
result->msgMetrics[i].lastMessageSend = mEntry->metrics.lastMessageSend;
result->msgMetrics[i].bndId = entry->bndId;
result->msgMetrics[i].typeId = mEntry->type;
@@ -563,8 +562,7 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
entry->seqNr++;
bool sendOk = true;
{
- int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput,
- serializedIoVecOutputLen, 0);
+ int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput, serializedIoVecOutputLen, 0);
if (rc < 0) {
status = -1;
sendOk = false;
diff --git a/bundles/pubsub/test/CMakeLists.txt b/bundles/pubsub/test/CMakeLists.txt
index a78958a..b18f252 100644
--- a/bundles/pubsub/test/CMakeLists.txt
+++ b/bundles/pubsub/test/CMakeLists.txt
@@ -38,7 +38,7 @@ celix_bundle_files(pubsub_endpoint_sut
add_celix_bundle(pubsub_endpoint_tst
#Test bundle containing cpputests and uses celix_test_runner launcher instead of the celix launcher
SOURCES
- test/tst_activator.c
+ test/tst_endpoint_activator.c
VERSION 1.0.0
)
target_link_libraries(pubsub_endpoint_tst PRIVATE Celix::framework Celix::pubsub_api)
@@ -213,7 +213,7 @@ if (BUILD_PUBSUB_PSA_TCP)
Celix::pubsub_serializer_json
Celix::pubsub_topology_manager
Celix::pubsub_admin_tcp
- Celix::pubsub_protocol_wire_v1
+ Celix::pubsub_protocol_wire_v2
pubsub_loopback
pubsub_endpoint_sut
pubsub_endpoint_tst
diff --git a/bundles/pubsub/test/test/sut_endpoint_activator.c b/bundles/pubsub/test/test/sut_endpoint_activator.c
index c52ebf7..f3d8fa8 100644
--- a/bundles/pubsub/test/test/sut_endpoint_activator.c
+++ b/bundles/pubsub/test/test/sut_endpoint_activator.c
@@ -97,9 +97,7 @@ static void* sut_sendThread(void *data) {
if (msg.seqNr % 1000 == 0) {
printf("Send %i messages\n", msg.seqNr);
}
-
msg.seqNr += 1;
-
}
pthread_mutex_unlock(&act->mutex);