You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rl...@apache.org on 2019/08/23 06:17:02 UTC
[celix] branch develop updated: Added pubsub admin websocket
This is an automated email from the ASF dual-hosted git repository.
rlenferink pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/develop by this push:
new de7c3a1 Added pubsub admin websocket
new 41d1ba9 Merge pull request #39 from dhbfischer/feature/pubsub_admin_websocket
de7c3a1 is described below
commit de7c3a19359a9908c56801dd081630400729f288
Author: dfischer <ma...@daanfischer.nl>
AuthorDate: Tue Aug 6 17:26:56 2019 +0200
Added pubsub admin websocket
---
bundles/pubsub/CMakeLists.txt | 1 +
.../pubsub/pubsub_admin_websocket/CMakeLists.txt | 53 ++
.../pubsub_admin_websocket/src/psa_activator.c | 128 ++++
.../src/pubsub_psa_websocket_constants.h | 48 ++
.../src/pubsub_websocket_admin.c | 630 +++++++++++++++++
.../src/pubsub_websocket_admin.h | 54 ++
.../src/pubsub_websocket_common.c | 73 ++
.../src/pubsub_websocket_common.h | 55 ++
.../src/pubsub_websocket_topic_receiver.c | 787 +++++++++++++++++++++
.../src/pubsub_websocket_topic_receiver.h | 50 ++
.../src/pubsub_websocket_topic_sender.c | 477 +++++++++++++
.../src/pubsub_websocket_topic_sender.h | 48 ++
bundles/pubsub/test/CMakeLists.txt | 21 +
bundles/pubsub/test/meta_data/ping.properties | 1 +
14 files changed, 2426 insertions(+)
diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index 0373347..a084aa5 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -34,6 +34,7 @@ if (PUBSUB)
add_subdirectory(pubsub_admin_tcp)
add_subdirectory(pubsub_admin_nanomsg)
add_subdirectory(pubsub_admin_udp_mc)
+ add_subdirectory(pubsub_admin_websocket)
add_subdirectory(keygen)
add_subdirectory(mock)
diff --git a/bundles/pubsub/pubsub_admin_websocket/CMakeLists.txt b/bundles/pubsub/pubsub_admin_websocket/CMakeLists.txt
new file mode 100644
index 0000000..c992d5e
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/CMakeLists.txt
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+find_package(Jansson REQUIRED)
+find_package(UUID REQUIRED)
+
+if(NOT UUID_LIBRARY)
+ #i.e. not found for OSX
+ set(UUID_LIBRARY "")
+ set(UUID_INCLUDE_DIRS "")
+endif()
+
+add_celix_bundle(celix_pubsub_admin_websocket
+ BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_websocket"
+ VERSION "1.0.0"
+ GROUP "Celix/PubSub"
+ SOURCES
+ src/psa_activator.c
+ src/pubsub_websocket_admin.c
+ src/pubsub_websocket_topic_sender.c
+ src/pubsub_websocket_topic_receiver.c
+ src/pubsub_websocket_common.c
+)
+
+set_target_properties(celix_pubsub_admin_websocket PROPERTIES INSTALL_RPATH "$ORIGIN")
+target_link_libraries(celix_pubsub_admin_websocket PRIVATE
+ Celix::pubsub_spi
+ Celix::framework Celix::dfi Celix::log_helper Celix::utils
+ Celix::http_admin_api
+)
+target_include_directories(celix_pubsub_admin_websocket PRIVATE
+ ${JANSSON_INCLUDE_DIR}
+ ${UUID_INCLUDE_DIRS}
+ src
+)
+
+install_celix_bundle(celix_pubsub_admin_websocket EXPORT celix COMPONENT pubsub)
+target_link_libraries(celix_pubsub_admin_websocket PRIVATE Celix::shell_api)
+add_library(Celix::pubsub_admin_websocket ALIAS celix_pubsub_admin_websocket)
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/psa_activator.c b/bundles/pubsub/pubsub_admin_websocket/src/psa_activator.c
new file mode 100644
index 0000000..a1d36b0
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/psa_activator.c
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdlib.h>
+
+#include "celix_api.h"
+#include "pubsub_serializer.h"
+#include "log_helper.h"
+
+#include "pubsub_admin.h"
+#include "pubsub_admin_metrics.h"
+#include "pubsub_websocket_admin.h"
+#include "command.h"
+
+typedef struct psa_websocket_activator {
+ log_helper_t *logHelper;
+
+ pubsub_websocket_admin_t *admin;
+
+ long serializersTrackerId;
+
+ pubsub_admin_service_t adminService;
+ long adminSvcId;
+
+ pubsub_admin_metrics_service_t adminMetricsService;
+ long adminMetricsSvcId;
+
+ command_service_t cmdSvc;
+ long cmdSvcId;
+} psa_websocket_activator_t;
+
+int psa_websocket_start(psa_websocket_activator_t *act, celix_bundle_context_t *ctx) {
+ act->adminSvcId = -1L;
+ act->cmdSvcId = -1L;
+ act->serializersTrackerId = -1L;
+
+ logHelper_create(ctx, &act->logHelper);
+ logHelper_start(act->logHelper);
+
+ act->admin = pubsub_websocketAdmin_create(ctx, act->logHelper);
+ celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
+
+ //track serializers (only json)
+ if (status == CELIX_SUCCESS) {
+ celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+ opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
+ opts.filter.filter = "(pubsub.serializer=json)";
+ opts.filter.ignoreServiceLanguage = true;
+ opts.callbackHandle = act->admin;
+ opts.addWithProperties = pubsub_websocketAdmin_addSerializerSvc;
+ opts.removeWithProperties = pubsub_websocketAdmin_removeSerializerSvc;
+ act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+ }
+
+ //register pubsub admin service
+ if (status == CELIX_SUCCESS) {
+ pubsub_admin_service_t *psaSvc = &act->adminService;
+ psaSvc->handle = act->admin;
+ psaSvc->matchPublisher = pubsub_websocketAdmin_matchPublisher;
+ psaSvc->matchSubscriber = pubsub_websocketAdmin_matchSubscriber;
+ psaSvc->matchDiscoveredEndpoint = pubsub_websocketAdmin_matchDiscoveredEndpoint;
+ psaSvc->setupTopicSender = pubsub_websocketAdmin_setupTopicSender;
+ psaSvc->teardownTopicSender = pubsub_websocketAdmin_teardownTopicSender;
+ psaSvc->setupTopicReceiver = pubsub_websocketAdmin_setupTopicReceiver;
+ psaSvc->teardownTopicReceiver = pubsub_websocketAdmin_teardownTopicReceiver;
+ psaSvc->addDiscoveredEndpoint = pubsub_websocketAdmin_addDiscoveredEndpoint;
+ psaSvc->removeDiscoveredEndpoint = pubsub_websocketAdmin_removeDiscoveredEndpoint;
+
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_WEBSOCKET_ADMIN_TYPE);
+
+ act->adminSvcId = celix_bundleContext_registerService(ctx, psaSvc, PUBSUB_ADMIN_SERVICE_NAME, props);
+ }
+
+ if (status == CELIX_SUCCESS) {
+ act->adminMetricsService.handle = act->admin;
+ act->adminMetricsService.metrics = pubsub_websocketAdmin_metrics;
+
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_WEBSOCKET_ADMIN_TYPE);
+
+ act->adminMetricsSvcId = celix_bundleContext_registerService(ctx, &act->adminMetricsService, PUBSUB_ADMIN_METRICS_SERVICE_NAME, props);
+ }
+
+ //register shell command service
+ {
+ act->cmdSvc.handle = act->admin;
+ act->cmdSvc.executeCommand = pubsub_websocketAdmin_executeCommand;
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, OSGI_SHELL_COMMAND_NAME, "psa_websocket");
+ celix_properties_set(props, OSGI_SHELL_COMMAND_USAGE, "psa_websocket");
+ celix_properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the websocket PSA");
+ act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props);
+ }
+
+ return status;
+}
+
+int psa_websocket_stop(psa_websocket_activator_t *act, celix_bundle_context_t *ctx) {
+ celix_bundleContext_unregisterService(ctx, act->adminSvcId);
+ celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
+ celix_bundleContext_unregisterService(ctx, act->adminMetricsSvcId);
+ celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
+ pubsub_websocketAdmin_destroy(act->admin);
+
+ logHelper_stop(act->logHelper);
+ logHelper_destroy(&act->logHelper);
+
+ return CELIX_SUCCESS;
+}
+
+CELIX_GEN_BUNDLE_ACTIVATOR(psa_websocket_activator_t, psa_websocket_start, psa_websocket_stop);
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_psa_websocket_constants.h b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_psa_websocket_constants.h
new file mode 100644
index 0000000..c6fba82
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_psa_websocket_constants.h
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef PUBSUB_PSA_WEBSOCKET_CONSTANTS_H_
+#define PUBSUB_PSA_WEBSOCKET_CONSTANTS_H_
+
+#define PSA_WEBSOCKET_DEFAULT_QOS_SAMPLE_SCORE 30
+#define PSA_WEBSOCKET_DEFAULT_QOS_CONTROL_SCORE 70
+#define PSA_WEBSOCKET_DEFAULT_SCORE 30
+
+#define PSA_WEBSOCKET_QOS_SAMPLE_SCORE_KEY "PSA_WEBSOCKET_QOS_SAMPLE_SCORE"
+#define PSA_WEBSOCKET_QOS_CONTROL_SCORE_KEY "PSA_WEBSOCKET_QOS_CONTROL_SCORE"
+#define PSA_WEBSOCKET_DEFAULT_SCORE_KEY "PSA_WEBSOCKET_DEFAULT_SCORE"
+
+
+#define PSA_WEBSOCKET_METRICS_ENABLED "PSA_WEBSOCKET_METRICS_ENABLED"
+#define PSA_WEBSOCKET_DEFAULT_METRICS_ENABLED true
+
+#define PUBSUB_WEBSOCKET_VERBOSE_KEY "PSA_WEBSOCKET_VERBOSE"
+#define PUBSUB_WEBSOCKET_VERBOSE_DEFAULT true
+
+#define PUBSUB_WEBSOCKET_ADMIN_TYPE "websocket"
+#define PUBSUB_WEBSOCKET_ADDRESS_KEY "websocket.socket_address"
+#define PUBSUB_WEBSOCKET_PORT_KEY "websocket.socket_port"
+
+/**
+ * The static url which a subscriber should try to connect to.
+ * The urls are space separated
+ */
+#define PUBSUB_WEBSOCKET_STATIC_CONNECT_SOCKET_ADDRESSES "websocket.static.connect.socket_addresses"
+
+#endif /* PUBSUB_PSA_WEBSOCKET_CONSTANTS_H_ */
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
new file mode 100644
index 0000000..a7fbe29
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
@@ -0,0 +1,630 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <memory.h>
+#include <pubsub_endpoint.h>
+#include <pubsub_serializer.h>
+#include <ip_utils.h>
+
+#include "pubsub_utils.h"
+#include "pubsub_websocket_admin.h"
+#include "pubsub_psa_websocket_constants.h"
+#include "pubsub_websocket_topic_sender.h"
+#include "pubsub_websocket_topic_receiver.h"
+#include "pubsub_websocket_common.h"
+
+#define L_DEBUG(...) \
+ logHelper_log(psa->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+ logHelper_log(psa->log, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+ logHelper_log(psa->log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+ logHelper_log(psa->log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
+struct pubsub_websocket_admin {
+ celix_bundle_context_t *ctx;
+ log_helper_t *log;
+ const char *fwUUID;
+
+ double qosSampleScore;
+ double qosControlScore;
+ double defaultScore;
+
+ bool verbose;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = svcId, value = psa_websocket_serializer_entry_t*
+ } serializers;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = scope:topic key, value = pubsub_websocket_topic_sender_t*
+ } topicSenders;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = scope:topic key, value = pubsub_websocket_topic_sender_t*
+ } topicReceivers;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
+ } discoveredEndpoints;
+
+};
+
+typedef struct psa_websocket_serializer_entry {
+ const char *serType;
+ long svcId;
+ pubsub_serializer_service_t *svc;
+} psa_websocket_serializer_entry_t;
+
+static celix_status_t pubsub_websocketAdmin_connectEndpointToReceiver(pubsub_websocket_admin_t* psa, pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t *endpoint);
+static celix_status_t pubsub_websocketAdmin_disconnectEndpointFromReceiver(pubsub_websocket_admin_t* psa, pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t *endpoint);
+
+
+pubsub_websocket_admin_t* pubsub_websocketAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper) {
+ pubsub_websocket_admin_t *psa = calloc(1, sizeof(*psa));
+ psa->ctx = ctx;
+ psa->log = logHelper;
+ psa->verbose = celix_bundleContext_getPropertyAsBool(ctx, PUBSUB_WEBSOCKET_VERBOSE_KEY, PUBSUB_WEBSOCKET_VERBOSE_DEFAULT);
+ psa->fwUUID = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
+
+ psa->defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_WEBSOCKET_DEFAULT_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_SCORE);
+ psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_WEBSOCKET_QOS_SAMPLE_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_QOS_SAMPLE_SCORE);
+ psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_WEBSOCKET_QOS_CONTROL_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_QOS_CONTROL_SCORE);
+
+ celixThreadMutex_create(&psa->serializers.mutex, NULL);
+ psa->serializers.map = hashMap_create(NULL, NULL, NULL, NULL);
+
+ celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
+ psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+ celixThreadMutex_create(&psa->topicReceivers.mutex, NULL);
+ psa->topicReceivers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+ celixThreadMutex_create(&psa->discoveredEndpoints.mutex, NULL);
+ psa->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+ return psa;
+}
+
+void pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t *psa) {
+ if (psa == NULL) {
+ return;
+ }
+
+ //note assuming al psa register services and service tracker are removed.
+
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_websocket_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
+ pubsub_websocketTopicSender_destroy(sender);
+ }
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ iter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_websocket_topic_receiver_t *recv = hashMapIterator_nextValue(&iter);
+ pubsub_websocketTopicReceiver_destroy(recv);
+ }
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+ celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+ iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ celix_properties_t *ep = hashMapIterator_nextValue(&iter);
+ celix_properties_destroy(ep);
+ }
+ celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+
+ celixThreadMutex_lock(&psa->serializers.mutex);
+ iter = hashMapIterator_construct(psa->serializers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_websocket_serializer_entry_t *entry = hashMapIterator_nextValue(&iter);
+ free(entry);
+ }
+ celixThreadMutex_unlock(&psa->serializers.mutex);
+
+ celixThreadMutex_destroy(&psa->topicSenders.mutex);
+ hashMap_destroy(psa->topicSenders.map, true, false);
+
+ celixThreadMutex_destroy(&psa->topicReceivers.mutex);
+ hashMap_destroy(psa->topicReceivers.map, true, false);
+
+ celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
+ hashMap_destroy(psa->discoveredEndpoints.map, false, false);
+
+ celixThreadMutex_destroy(&psa->serializers.mutex);
+ hashMap_destroy(psa->serializers.map, false, false);
+
+ free(psa);
+}
+
+void pubsub_websocketAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
+ pubsub_websocket_admin_t *psa = handle;
+
+ const char *serType = celix_properties_get(props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+
+ if (serType == NULL) {
+ L_INFO("[PSA_WEBSOCKET] Ignoring serializer service without %s property", PUBSUB_SERIALIZER_TYPE_KEY);
+ return;
+ }
+
+ celixThreadMutex_lock(&psa->serializers.mutex);
+ psa_websocket_serializer_entry_t *entry = hashMap_get(psa->serializers.map, (void*)svcId);
+ if (entry == NULL) {
+ entry = calloc(1, sizeof(*entry));
+ entry->serType = serType;
+ entry->svcId = svcId;
+ entry->svc = svc;
+ hashMap_put(psa->serializers.map, (void*)svcId, entry);
+ }
+ celixThreadMutex_unlock(&psa->serializers.mutex);
+}
+
+void pubsub_websocketAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
+ pubsub_websocket_admin_t *psa = handle;
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+
+ //remove serializer
+ // 1) First find entry and
+ // 2) loop and destroy all topic sender using the serializer and
+ // 3) loop and destroy all topic receivers using the serializer
+ // Note that it is the responsibility of the topology manager to create new topic senders/receivers
+
+ celixThreadMutex_lock(&psa->serializers.mutex);
+ psa_websocket_serializer_entry_t *entry = hashMap_remove(psa->serializers.map, (void*)svcId);
+ celixThreadMutex_unlock(&psa->serializers.mutex);
+
+ if (entry != NULL) {
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+ pubsub_websocket_topic_sender_t *sender = hashMapEntry_getValue(senderEntry);
+ if (sender != NULL && entry->svcId == pubsub_websocketTopicSender_serializerSvcId(sender)) {
+ char *key = hashMapEntry_getKey(senderEntry);
+ hashMapIterator_remove(&iter);
+ pubsub_websocketTopicSender_destroy(sender);
+ free(key);
+ }
+ }
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ iter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+ pubsub_websocket_topic_receiver_t *receiver = hashMapEntry_getValue(senderEntry);
+ if (receiver != NULL && entry->svcId == pubsub_websocketTopicReceiver_serializerSvcId(receiver)) {
+ char *key = hashMapEntry_getKey(senderEntry);
+ hashMapIterator_remove(&iter);
+ pubsub_websocketTopicReceiver_destroy(receiver);
+ free(key);
+ }
+ }
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+ free(entry);
+ }
+}
+
+celix_status_t pubsub_websocketAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
+ pubsub_websocket_admin_t *psa = handle;
+ L_DEBUG("[PSA_WEBSOCKET] pubsub_websocketAdmin_matchPublisher");
+ celix_status_t status = CELIX_SUCCESS;
+ double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_WEBSOCKET_ADMIN_TYPE,
+ psa->qosSampleScore, psa->qosControlScore, psa->defaultScore,
+ topicProperties, outSerializerSvcId);
+ *outScore = score;
+
+ return status;
+}
+
+celix_status_t pubsub_websocketAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
+ pubsub_websocket_admin_t *psa = handle;
+ L_DEBUG("[PSA_WEBSOCKET] pubsub_websocketAdmin_matchSubscriber");
+ celix_status_t status = CELIX_SUCCESS;
+ double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_WEBSOCKET_ADMIN_TYPE,
+ psa->qosSampleScore, psa->qosControlScore, psa->defaultScore,
+ topicProperties, outSerializerSvcId);
+ if (outScore != NULL) {
+ *outScore = score;
+ }
+ return status;
+}
+
+celix_status_t pubsub_websocketAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *outMatch) {
+ pubsub_websocket_admin_t *psa = handle;
+ L_DEBUG("[PSA_WEBSOCKET] pubsub_websocketAdmin_matchEndpoint");
+ celix_status_t status = CELIX_SUCCESS;
+ bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_WEBSOCKET_ADMIN_TYPE, NULL);
+ if (outMatch != NULL) {
+ *outMatch = match;
+ }
+ return status;
+}
+
+celix_status_t pubsub_websocketAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
+ pubsub_websocket_admin_t *psa = handle;
+ celix_status_t status = CELIX_SUCCESS;
+
+ //1) Create TopicSender
+ //2) Store TopicSender
+ //3) Connect existing endpoints
+ //4) set outPublisherEndpoint
+
+ celix_properties_t *newEndpoint = NULL;
+
+ char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+
+ celixThreadMutex_lock(&psa->serializers.mutex);
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ pubsub_websocket_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key);
+ if (sender == NULL) {
+ psa_websocket_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
+ if (serEntry != NULL) {
+ sender = pubsub_websocketTopicSender_create(psa->ctx, psa->log, scope, topic, serializerSvcId, serEntry->svc);
+ }
+ if (sender != NULL) {
+ const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE;
+ const char *serType = serEntry->serType;
+ newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
+ serType, NULL);
+
+ //Set endpoint visibility to local because the http server handles discovery
+ celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_LOCAL_VISIBILITY);
+
+ //if available also set container name
+ const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
+ if (cn != NULL) {
+ celix_properties_set(newEndpoint, "container_name", cn);
+ }
+ hashMap_put(psa->topicSenders.map, key, sender);
+ } else {
+ L_ERROR("[PSA_WEBSOCKET] Error creating a TopicSender");
+ free(key);
+ }
+ } else {
+ free(key);
+ L_ERROR("[PSA_WEBSOCKET] Cannot setup already existing TopicSender for scope/topic %s/%s!", scope, topic);
+ }
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+ celixThreadMutex_unlock(&psa->serializers.mutex);
+
+ if (newEndpoint != NULL && outPublisherEndpoint != NULL) {
+ *outPublisherEndpoint = newEndpoint;
+ }
+
+ return status;
+}
+
+celix_status_t pubsub_websocketAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic) {
+ pubsub_websocket_admin_t *psa = handle;
+ celix_status_t status = CELIX_SUCCESS;
+
+ //1) Find and remove TopicSender from map
+ //2) destroy topic sender
+
+ char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ hash_map_entry_t *entry = hashMap_getEntry(psa->topicSenders.map, key);
+ if (entry != NULL) {
+ char *mapKey = hashMapEntry_getKey(entry);
+ pubsub_websocket_topic_sender_t *sender = hashMap_remove(psa->topicSenders.map, key);
+ free(mapKey);
+ pubsub_websocketTopicSender_destroy(sender);
+ } else {
+ L_ERROR("[PSA_WEBSOCKET] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic);
+ }
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+ free(key);
+
+ return status;
+}
+
+celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
+ pubsub_websocket_admin_t *psa = handle;
+
+ celix_properties_t *newEndpoint = NULL;
+
+ char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+ celixThreadMutex_lock(&psa->serializers.mutex);
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ pubsub_websocket_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
+ if (receiver == NULL) {
+ psa_websocket_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
+ if (serEntry != NULL) {
+ receiver = pubsub_websocketTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, serializerSvcId, serEntry->svc);
+ } else {
+ L_ERROR("[PSA_WEBSOCKET] Cannot find serializer for TopicSender %s/%s", scope, topic);
+ }
+ if (receiver != NULL) {
+ const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE;
+ const char *serType = serEntry->serType;
+ newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
+ PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL);
+
+ //Set endpoint visibility to local because the http server handles discovery
+ celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_LOCAL_VISIBILITY);
+
+ //if available also set container name
+ const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
+ if (cn != NULL) {
+ celix_properties_set(newEndpoint, "container_name", cn);
+ }
+ hashMap_put(psa->topicReceivers.map, key, receiver);
+ } else {
+ L_ERROR("[PSA_WEBSOCKET] Error creating a TopicReceiver.");
+ free(key);
+ }
+ } else {
+ free(key);
+ L_ERROR("[PSA_WEBSOCKET] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic);
+ }
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+ celixThreadMutex_unlock(&psa->serializers.mutex);
+
+ if (receiver != NULL && newEndpoint != NULL) {
+ celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
+ const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+ if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+ pubsub_websocketAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+ }
+ }
+ celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+ }
+
+ if (newEndpoint != NULL && outSubscriberEndpoint != NULL) {
+ *outSubscriberEndpoint = newEndpoint;
+ }
+
+ celix_status_t status = CELIX_SUCCESS;
+ return status;
+}
+
+celix_status_t pubsub_websocketAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic) {
+ pubsub_websocket_admin_t *psa = handle;
+
+ char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ hash_map_entry_t *entry = hashMap_getEntry(psa->topicReceivers.map, key);
+ free(key);
+ if (entry != NULL) {
+ char *receiverKey = hashMapEntry_getKey(entry);
+ pubsub_websocket_topic_receiver_t *receiver = hashMapEntry_getValue(entry);
+ hashMap_remove(psa->topicReceivers.map, receiverKey);
+
+ free(receiverKey);
+ pubsub_websocketTopicReceiver_destroy(receiver);
+ }
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+
+ celix_status_t status = CELIX_SUCCESS;
+ return status;
+}
+
+static celix_status_t pubsub_websocketAdmin_connectEndpointToReceiver(pubsub_websocket_admin_t* psa, pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
+ //note can be called with discoveredEndpoint.mutex lock
+ celix_status_t status = CELIX_SUCCESS;
+
+ const char *scope = pubsub_websocketTopicReceiver_scope(receiver);
+ const char *topic = pubsub_websocketTopicReceiver_topic(receiver);
+
+ const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+ const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+ const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+ const char *sockAddress = celix_properties_get(endpoint, PUBSUB_WEBSOCKET_ADDRESS_KEY, NULL);
+ long sockPort = celix_properties_getAsLong(endpoint, PUBSUB_WEBSOCKET_PORT_KEY, -1L);
+
+ bool publisher = type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0;
+
+ if (publisher && (sockAddress == NULL || sockPort < 0)) {
+ L_WARN("[PSA WEBSOCKET] Error got endpoint without websocket address/port or endpoint type. Properties:");
+ const char *key = NULL;
+ CELIX_PROPERTIES_FOR_EACH(endpoint, key) {
+ L_WARN("[PSA WEBSOCKET] |- %s=%s\n", key, celix_properties_get(endpoint, key, NULL));
+ }
+ status = CELIX_BUNDLE_EXCEPTION;
+ } else {
+ if (eScope != NULL && eTopic != NULL &&
+ strncmp(eScope, scope, 1024 * 1024) == 0 &&
+ strncmp(eTopic, topic, 1024 * 1024) == 0) {
+ char *uri = psa_websocket_createURI(eScope, eTopic);
+ pubsub_websocketTopicReceiver_connectTo(receiver, sockAddress, sockPort, uri);
+ }
+ }
+
+ return status;
+}
+
+celix_status_t pubsub_websocketAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
+ pubsub_websocket_admin_t *psa = handle;
+
+ const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+
+ if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_websocket_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+ pubsub_websocketAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+ }
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+ }
+
+ celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+ celix_properties_t *cpy = celix_properties_copy(endpoint);
+ const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, NULL);
+ hashMap_put(psa->discoveredEndpoints.map, (void*)uuid, cpy);
+ celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+
+ celix_status_t status = CELIX_SUCCESS;
+ return status;
+}
+
+
+static celix_status_t pubsub_websocketAdmin_disconnectEndpointFromReceiver(pubsub_websocket_admin_t* psa, pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
+ //note can be called with discoveredEndpoint.mutex lock
+ celix_status_t status = CELIX_SUCCESS;
+
+ const char *scope = pubsub_websocketTopicReceiver_scope(receiver);
+ const char *topic = pubsub_websocketTopicReceiver_topic(receiver);
+
+ const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+ const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+
+ if (eScope != NULL && eTopic != NULL &&
+ strncmp(eScope, scope, 1024 * 1024) == 0 && strncmp(eTopic, topic, 1024 * 1024) == 0) {
+ char *uri = psa_websocket_createURI(eScope, eTopic);
+ pubsub_websocketTopicReceiver_disconnectFrom(receiver, uri);
+
+ }
+
+ return status;
+}
+
+celix_status_t pubsub_websocketAdmin_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
+ pubsub_websocket_admin_t *psa = handle;
+
+ const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+
+ if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_websocket_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+ pubsub_websocketAdmin_disconnectEndpointFromReceiver(psa, receiver, endpoint);
+ }
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+ }
+
+ celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+ const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
+ celix_properties_t *found = hashMap_remove(psa->discoveredEndpoints.map, (void*)uuid);
+ celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+
+ if (found != NULL) {
+ celix_properties_destroy(found);
+ }
+
+ celix_status_t status = CELIX_SUCCESS;
+ return status;
+}
+
+celix_status_t pubsub_websocketAdmin_executeCommand(void *handle, char *commandLine __attribute__((unused)), FILE *out, FILE *errStream __attribute__((unused))) {
+ pubsub_websocket_admin_t *psa = handle;
+ celix_status_t status = CELIX_SUCCESS;
+
+ fprintf(out, "\n");
+ fprintf(out, "Topic Senders:\n");
+ celixThreadMutex_lock(&psa->serializers.mutex);
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_websocket_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
+ long serSvcId = pubsub_websocketTopicSender_serializerSvcId(sender);
+ psa_websocket_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serSvcId);
+ const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
+ const char *scope = pubsub_websocketTopicSender_scope(sender);
+ const char *topic = pubsub_websocketTopicSender_topic(sender);
+ const char *url = pubsub_websocketTopicSender_url(sender);
+// const char *postUrl = pubsub_websocketTopicSender_isStatic(sender) ? " (static)" : "";
+ fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
+ fprintf(out, " |- serializer type = %s\n", serType);
+ fprintf(out, " |- url = %s\n", url);
+ }
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+ celixThreadMutex_unlock(&psa->serializers.mutex);
+
+ fprintf(out, "\n");
+ fprintf(out, "\nTopic Receivers:\n");
+ celixThreadMutex_lock(&psa->serializers.mutex);
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ iter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_websocket_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+ long serSvcId = pubsub_websocketTopicReceiver_serializerSvcId(receiver);
+ psa_websocket_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serSvcId);
+ const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
+ const char *scope = pubsub_websocketTopicReceiver_scope(receiver);
+ const char *topic = pubsub_websocketTopicReceiver_topic(receiver);
+
+ celix_array_list_t *connected = celix_arrayList_create();
+ celix_array_list_t *unconnected = celix_arrayList_create();
+ pubsub_websocketTopicReceiver_listConnections(receiver, connected, unconnected);
+
+ fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
+ fprintf(out, " |- serializer type = %s\n", serType);
+ for (int i = 0; i < celix_arrayList_size(connected); ++i) {
+ char *url = celix_arrayList_get(connected, i);
+ fprintf(out, " |- connected url = %s\n", url);
+ free(url);
+ }
+ for (int i = 0; i < celix_arrayList_size(unconnected); ++i) {
+ char *url = celix_arrayList_get(unconnected, i);
+ fprintf(out, " |- unconnected url = %s\n", url);
+ free(url);
+ }
+ celix_arrayList_destroy(connected);
+ celix_arrayList_destroy(unconnected);
+ }
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+ celixThreadMutex_unlock(&psa->serializers.mutex);
+ fprintf(out, "\n");
+
+ return status;
+}
+
+pubsub_admin_metrics_t* pubsub_websocketAdmin_metrics(void *handle) {
+ pubsub_websocket_admin_t *psa = handle;
+ pubsub_admin_metrics_t *result = calloc(1, sizeof(*result));
+ snprintf(result->psaType, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", PUBSUB_WEBSOCKET_ADMIN_TYPE);
+ result->senders = celix_arrayList_create();
+ result->receivers = celix_arrayList_create();
+
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_websocket_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
+ pubsub_admin_sender_metrics_t *metrics = pubsub_websocketTopicSender_metrics(sender);
+ celix_arrayList_add(result->senders, metrics);
+ }
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ iter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_websocket_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+ pubsub_admin_receiver_metrics_t *metrics = pubsub_websocketTopicReceiver_metrics(receiver);
+ celix_arrayList_add(result->receivers, metrics);
+ }
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+ return result;
+}
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h
new file mode 100644
index 0000000..62a14a9
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef CELIX_PUBSUB_WEBSOCKET_ADMIN_H
+#define CELIX_PUBSUB_WEBSOCKET_ADMIN_H
+
+#include <pubsub_admin_metrics.h>
+#include "celix_api.h"
+#include "log_helper.h"
+#include "pubsub_psa_websocket_constants.h"
+
+typedef struct pubsub_websocket_admin pubsub_websocket_admin_t;
+
+pubsub_websocket_admin_t* pubsub_websocketAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
+void pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t *psa);
+
+celix_status_t pubsub_websocketAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_websocketAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_websocketAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
+
+celix_status_t pubsub_websocketAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsub_websocketAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
+
+celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_websocketAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
+
+celix_status_t pubsub_websocketAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
+celix_status_t pubsub_websocketAdmin_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
+
+void pubsub_websocketAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
+void pubsub_websocketAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
+
+celix_status_t pubsub_websocketAdmin_executeCommand(void *handle, char *commandLine, FILE *outStream, FILE *errStream);
+
+pubsub_admin_metrics_t* pubsub_websocketAdmin_metrics(void *handle);
+
+#endif //CELIX_PUBSUB_WEBSOCKET_ADMIN_H
+
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c
new file mode 100644
index 0000000..ce85e40
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <memory.h>
+#include <assert.h>
+#include <stdio.h>
+#include "pubsub_websocket_common.h"
+
+int psa_websocket_localMsgTypeIdForMsgType(void* handle __attribute__((unused)), const char* msgType, unsigned int* msgTypeId) {
+ *msgTypeId = utils_stringHash(msgType);
+ return 0;
+}
+
+bool psa_websocket_checkVersion(version_pt msgVersion, const pubsub_websocket_msg_header_t *hdr) {
+ bool check=false;
+ int major=0,minor=0;
+
+ if (hdr->major == 0 && hdr->minor == 0) {
+ //no check
+ return true;
+ }
+
+ if (msgVersion!=NULL) {
+ version_getMajor(msgVersion,&major);
+ version_getMinor(msgVersion,&minor);
+ if (hdr->major==((unsigned char)major)) { /* Different major means incompatible */
+ check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
+ }
+ }
+
+ return check;
+}
+
+void psa_websocket_setScopeAndTopicFilter(const char* scope, const char *topic, char *filter) {
+ for (int i = 0; i < 5; ++i) {
+ filter[i] = '\0';
+ }
+ if (scope != NULL && strnlen(scope, 3) >= 2) {
+ filter[0] = scope[0];
+ filter[1] = scope[1];
+ }
+ if (topic != NULL && strnlen(topic, 3) >= 2) {
+ filter[2] = topic[0];
+ filter[3] = topic[1];
+ }
+}
+
+char *psa_websocket_createURI(const char *scope, const char *topic) {
+ char *uri = NULL;
+ if(scope != NULL && topic != NULL) {
+ asprintf(&uri, "/pubsub/%s/%s", scope, topic);
+ }
+ else if(scope == NULL && topic != NULL) {
+ asprintf(&uri, "/pubsub/default/%s", topic);
+ }
+ return uri;
+}
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
new file mode 100644
index 0000000..89ec65a
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef CELIX_PUBSUB_WEBSOCKET_COMMON_H
+#define CELIX_PUBSUB_WEBSOCKET_COMMON_H
+
+#include <utils.h>
+#include <stdint.h>
+
+#include "version.h"
+
+
+struct pubsub_websocket_msg_header {
+ uint32_t type; //msg type id (hash of fqn)
+ uint8_t major;
+ uint8_t minor;
+ uint32_t seqNr;
+ unsigned char originUUID[16];
+ uint64_t sendtimeSeconds; //seconds since epoch
+ uint64_t sendTimeNanoseconds; //ns since epoch
+};
+
+typedef struct pubsub_websocket_msg_header pubsub_websocket_msg_header_t;
+
+struct pubsub_websocket_msg {
+ pubsub_websocket_msg_header_t header;
+ unsigned int payloadSize;
+ char payload[];
+};
+
+typedef struct pubsub_websocket_msg pubsub_websocket_msg_t;
+
+int psa_websocket_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId);
+void psa_websocket_setScopeAndTopicFilter(const char* scope, const char *topic, char *filter);
+char *psa_websocket_createURI(const char *scope, const char *topic);
+
+bool psa_websocket_checkVersion(version_pt msgVersion, const pubsub_websocket_msg_header_t *hdr);
+
+#endif //CELIX_PUBSUB_WEBSOCKET_COMMON_H
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
new file mode 100644
index 0000000..4a41a0e
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
@@ -0,0 +1,787 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pubsub_serializer.h>
+#include <stdlib.h>
+#include <pubsub/subscriber.h>
+#include <memory.h>
+#include <pubsub_constants.h>
+#include <sys/epoll.h>
+#include <assert.h>
+#include <pubsub_endpoint.h>
+#include <arpa/inet.h>
+#include <log_helper.h>
+#include <math.h>
+#include "pubsub_websocket_topic_receiver.h"
+#include "pubsub_psa_websocket_constants.h"
+#include "pubsub_websocket_common.h"
+
+#include <uuid/uuid.h>
+#include <pubsub_admin_metrics.h>
+#include <http_admin/api.h>
+
+#ifndef UUID_STR_LEN
+#define UUID_STR_LEN 37
+#endif
+
+
+#define L_DEBUG(...) \
+ logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+ logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+ logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+ logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
+typedef struct pubsub_websocket_rcv_buffer {
+ celix_thread_mutex_t mutex;
+ celix_array_list_t *list; //List of received websocket messages (type: pubsub_websocket_msg_t *)
+ celix_array_list_t *rcvTimes; //Corresponding receive times of the received websocket messages (rcvTimes[i] -> list[i])
+} pubsub_websocket_rcv_buffer_t;
+
+struct pubsub_websocket_topic_receiver {
+ celix_bundle_context_t *ctx;
+ log_helper_t *logHelper;
+ long serializerSvcId;
+ pubsub_serializer_service_t *serializer;
+ char *scope;
+ char *topic;
+ char scopeAndTopicFilter[5];
+ char *uri;
+ bool metricsEnabled;
+
+ pubsub_websocket_rcv_buffer_t recvBuffer;
+
+ struct {
+ celix_thread_t thread;
+ celix_thread_mutex_t mutex;
+ bool running;
+ } recvThread;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = url (host:port), value = psa_websocket_requested_connection_entry_t*
+ bool allConnected; //true if all requestedConnectection are connected
+ } requestedConnections;
+
+ long subscriberTrackerId;
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = bnd id, value = psa_websocket_subscriber_entry_t
+ bool allInitialized;
+ } subscribers;
+};
+
+typedef struct psa_websocket_requested_connection_entry {
+ pubsub_websocket_rcv_buffer_t *recvBuffer;
+ char *key; //host:port
+ char *socketAddress;
+ long socketPort;
+ char *uri;
+ struct mg_connection *sockConnection;
+ int connectRetryCount;
+ bool connected;
+ bool statically; //true if the connection is statically configured through the topic properties.
+} psa_websocket_requested_connection_entry_t;
+
+typedef struct psa_websocket_subscriber_metrics_entry_t {
+ unsigned int msgTypeId;
+ uuid_t origin;
+
+ unsigned long nrOfMessagesReceived;
+ unsigned long nrOfSerializationErrors;
+ struct timespec lastMessageReceived;
+ double averageTimeBetweenMessagesInSeconds;
+ double averageSerializationTimeInSeconds;
+ double averageDelayInSeconds;
+ double maxDelayInSeconds;
+ double minDelayInSeconds;
+ unsigned int lastSeqNr;
+ unsigned long nrOfMissingSeqNumbers;
+} psa_websocket_subscriber_metrics_entry_t;
+
+typedef struct psa_websocket_subscriber_entry {
+ int usageCount;
+ hash_map_t *msgTypes; //map from serializer svc
+ hash_map_t *metrics; //key = msg type id, value = hash_map (key = origin uuid, value = psa_websocket_subscriber_metrics_entry_t*
+ pubsub_subscriber_t *svc;
+ bool initialized; //true if the init function is called through the receive thread
+} psa_websocket_subscriber_entry_t;
+
+
+static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
+static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
+static void* psa_websocket_recvThread(void * data);
+static void psa_websocket_connectToAllRequestedConnections(pubsub_websocket_topic_receiver_t *receiver);
+static void psa_websocket_initializeAllSubscribers(pubsub_websocket_topic_receiver_t *receiver);
+
+static int psa_websocketTopicReceiver_data(struct mg_connection *connection, int op_code, char *data, size_t length, void *handle);
+static void psa_websocketTopicReceiver_close(const struct mg_connection *connection, void *handle);
+
+
+pubsub_websocket_topic_receiver_t* pubsub_websocketTopicReceiver_create(celix_bundle_context_t *ctx,
+ log_helper_t *logHelper,
+ const char *scope,
+ const char *topic,
+ const celix_properties_t *topicProperties,
+ long serializerSvcId,
+ pubsub_serializer_service_t *serializer) {
+ pubsub_websocket_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
+ receiver->ctx = ctx;
+ receiver->logHelper = logHelper;
+ receiver->serializerSvcId = serializerSvcId;
+ receiver->serializer = serializer;
+ receiver->scope = strndup(scope, 1024 * 1024);
+ receiver->topic = strndup(topic, 1024 * 1024);
+ psa_websocket_setScopeAndTopicFilter(scope, topic, receiver->scopeAndTopicFilter);
+ receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_WEBSOCKET_METRICS_ENABLED, PSA_WEBSOCKET_DEFAULT_METRICS_ENABLED);
+
+ receiver->uri = psa_websocket_createURI(scope, topic);
+
+ if (receiver->uri != NULL) {
+ celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
+ celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
+ celixThreadMutex_create(&receiver->recvThread.mutex, NULL);
+ celixThreadMutex_create(&receiver->recvBuffer.mutex, NULL);
+
+ receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+ receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ arrayList_create(&receiver->recvBuffer.list);
+ arrayList_create(&receiver->recvBuffer.rcvTimes);
+ }
+
+ //track subscribers
+ if (receiver->uri != NULL) {
+ int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
+ char buf[size+1];
+ snprintf(buf, (size_t)size+1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
+ celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+ opts.filter.ignoreServiceLanguage = true;
+ opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
+ opts.filter.filter = buf;
+ opts.callbackHandle = receiver;
+ opts.addWithOwner = pubsub_websocketTopicReceiver_addSubscriber;
+ opts.removeWithOwner = pubsub_websocketTopicReceiver_removeSubscriber;
+
+ receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+ }
+
+
+ const char *staticConnects = celix_properties_get(topicProperties, PUBSUB_WEBSOCKET_STATIC_CONNECT_SOCKET_ADDRESSES, NULL);
+ if (staticConnects != NULL) {
+ char *copy = strndup(staticConnects, 1024*1024);
+ char* addr;
+ char* save = copy;
+
+ while ((addr = strtok_r(save, " ", &save))) {
+ char *colon = strchr(addr, ':');
+ if (colon == NULL) {
+ continue;
+ }
+
+ char *sockAddr = NULL;
+ asprintf(&sockAddr, "%.*s", (int)(colon - addr), addr);
+
+ long sockPort = atol((colon + 1));
+
+ char *key = NULL;
+ asprintf(&key, "%s:%li", sockAddr, sockPort);
+
+
+ if (sockPort > 0) {
+ psa_websocket_requested_connection_entry_t *entry = calloc(1, sizeof(*entry));
+ entry->key = key;
+ entry->uri = strndup(receiver->uri, 1024 * 1024);
+ entry->socketAddress = sockAddr;
+ entry->socketPort = sockPort;
+ entry->connected = false;
+ entry->statically = true;
+ entry->recvBuffer = &receiver->recvBuffer;
+ hashMap_put(receiver->requestedConnections.map, (void *) entry->key, entry);
+ } else {
+ L_WARN("[PSA_WEBSOCKET_TR] Invalid static socket address %s", addr);
+ free(key);
+ free(sockAddr);
+ }
+ }
+ free(copy);
+ }
+
+
+ if (receiver->uri != NULL) {
+ receiver->recvThread.running = true;
+ celixThread_create(&receiver->recvThread.thread, NULL, psa_websocket_recvThread, receiver);
+ char name[64];
+ snprintf(name, 64, "WEBSOCKET TR %s/%s", scope, topic);
+ celixThread_setName(&receiver->recvThread.thread, name);
+ }
+
+ if (receiver->uri == NULL) {
+ free(receiver->scope);
+ free(receiver->topic);
+ free(receiver);
+ receiver = NULL;
+ L_ERROR("[PSA_WEBSOCKET] Cannot create TopicReceiver for %s/%s", scope, topic);
+ }
+
+ return receiver;
+}
+
+void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *receiver) {
+ if (receiver != NULL) {
+
+ celixThreadMutex_lock(&receiver->recvThread.mutex);
+ receiver->recvThread.running = false;
+ celixThreadMutex_unlock(&receiver->recvThread.mutex);
+ celixThread_join(receiver->recvThread.thread, NULL);
+
+ celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId);
+
+ celixThreadMutex_lock(&receiver->subscribers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (entry != NULL) {
+ hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
+ while (hashMapIterator_hasNext(&iter2)) {
+ hash_map_t *origins = hashMapIterator_nextValue(&iter2);
+ hashMap_destroy(origins, true, true);
+ }
+ hashMap_destroy(entry->metrics, false, false);
+
+ receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+ free(entry);
+ }
+
+ }
+ hashMap_destroy(receiver->subscribers.map, false, false);
+
+
+ celixThreadMutex_unlock(&receiver->subscribers.mutex);
+
+ celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+ iter = hashMapIterator_construct(receiver->requestedConnections.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_websocket_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (entry != NULL) {
+ if(entry->connected) {
+ mg_close_connection(entry->sockConnection);
+ }
+ free(entry->uri);
+ free(entry->socketAddress);
+ free(entry->key);
+ free(entry);
+ }
+ }
+ hashMap_destroy(receiver->requestedConnections.map, false, false);
+ celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+ celixThreadMutex_destroy(&receiver->subscribers.mutex);
+ celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
+ celixThreadMutex_destroy(&receiver->recvThread.mutex);
+
+ celixThreadMutex_destroy(&receiver->recvBuffer.mutex);
+ int msgBufSize = celix_arrayList_size(receiver->recvBuffer.list);
+ while(msgBufSize > 0) {
+ pubsub_websocket_msg_t *msg = celix_arrayList_get(receiver->recvBuffer.list, msgBufSize - 1);
+ free(msg);
+ msgBufSize--;
+ }
+ celix_arrayList_destroy(receiver->recvBuffer.list);
+
+ int rcvTimesSize = celix_arrayList_size(receiver->recvBuffer.rcvTimes);
+ while(rcvTimesSize > 0) {
+ struct timespec *time = celix_arrayList_get(receiver->recvBuffer.rcvTimes, rcvTimesSize - 1);
+ free(time);
+ rcvTimesSize--;
+ }
+ celix_arrayList_destroy(receiver->recvBuffer.rcvTimes);
+
+ free(receiver->uri);
+ free(receiver->scope);
+ free(receiver->topic);
+ }
+ free(receiver);
+}
+
+const char* pubsub_websocketTopicReceiver_scope(pubsub_websocket_topic_receiver_t *receiver) {
+ return receiver->scope;
+}
+const char* pubsub_websocketTopicReceiver_topic(pubsub_websocket_topic_receiver_t *receiver) {
+ return receiver->topic;
+}
+
+long pubsub_websocketTopicReceiver_serializerSvcId(pubsub_websocket_topic_receiver_t *receiver) {
+ return receiver->serializerSvcId;
+}
+
+void pubsub_websocketTopicReceiver_listConnections(pubsub_websocket_topic_receiver_t *receiver, celix_array_list_t *connectedUrls, celix_array_list_t *unconnectedUrls) {
+ celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_websocket_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+ char *url = NULL;
+ asprintf(&url, "%s%s", entry->uri, entry->statically ? " (static)" : "");
+ if (entry->connected) {
+ celix_arrayList_add(connectedUrls, url);
+ } else {
+ celix_arrayList_add(unconnectedUrls, url);
+ }
+ }
+ celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+
+void pubsub_websocketTopicReceiver_connectTo(pubsub_websocket_topic_receiver_t *receiver, const char *socketAddress, long socketPort, const char *uri) {
+ L_DEBUG("[PSA_WEBSOCKET] TopicReceiver %s/%s connecting to websocket uri %s", receiver->scope, receiver->topic, uri);
+
+ char *key = NULL;
+ asprintf(&key, "%s:%li", socketAddress, socketPort);
+
+ celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+ psa_websocket_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, key);
+ if (entry == NULL) {
+ entry = calloc(1, sizeof(*entry));
+ entry->key = key;
+ entry->uri = strndup(uri, 1024 * 1024);
+ entry->socketAddress = strndup(socketAddress, 1024 * 1024);
+ entry->socketPort = socketPort;
+ entry->connected = false;
+ entry->statically = false;
+ entry->recvBuffer = &receiver->recvBuffer;
+ hashMap_put(receiver->requestedConnections.map, (void*)entry->uri, entry);
+ receiver->requestedConnections.allConnected = false;
+ }
+ celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+ psa_websocket_connectToAllRequestedConnections(receiver);
+}
+
+void pubsub_websocketTopicReceiver_disconnectFrom(pubsub_websocket_topic_receiver_t *receiver, const char *uri) {
+ L_DEBUG("[PSA_WEBSOCKET] TopicReceiver %s/%s disconnect from websocket uri %s", receiver->scope, receiver->topic, uri);
+
+ celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+ psa_websocket_requested_connection_entry_t *entry = hashMap_remove(receiver->requestedConnections.map, uri);
+ if (entry != NULL && entry->connected) {
+ mg_close_connection(entry->sockConnection);
+ L_WARN("[PSA_WEBSOCKET] Error disconnecting from websocket uri %s.", uri);
+ }
+ if (entry != NULL) {
+ free(entry->socketAddress);
+ free(entry->uri);
+ free(entry->key);
+ free(entry);
+ }
+ celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
+ pubsub_websocket_topic_receiver_t *receiver = handle;
+
+ long bndId = celix_bundle_getId(bnd);
+ const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
+ if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) {
+ //not the same scope. ignore
+ return;
+ }
+
+ celixThreadMutex_lock(&receiver->subscribers.mutex);
+ psa_websocket_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
+ if (entry != NULL) {
+ entry->usageCount += 1;
+ } else {
+ //new create entry
+ entry = calloc(1, sizeof(*entry));
+ entry->usageCount = 1;
+ entry->svc = svc;
+ entry->initialized = false;
+
+ int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
+
+ if (rc == 0) {
+ entry->metrics = hashMap_create(NULL, NULL, NULL, NULL);
+ hash_map_iterator_t iter = hashMapIterator_construct(entry->msgTypes);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_msg_serializer_t *msgSer = hashMapIterator_nextValue(&iter);
+ hash_map_t *origins = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ hashMap_put(entry->metrics, (void*)(uintptr_t)msgSer->msgId, origins);
+ }
+ }
+
+ if (rc == 0) {
+ hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
+ } else {
+ L_ERROR("[PSA_WEBSOCKET] Cannot create msg serializer map for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+ free(entry);
+ }
+ }
+ celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
+
+static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props __attribute__((unused)), const celix_bundle_t *bnd) {
+ pubsub_websocket_topic_receiver_t *receiver = handle;
+
+ long bndId = celix_bundle_getId(bnd);
+
+ celixThreadMutex_lock(&receiver->subscribers.mutex);
+ psa_websocket_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
+ if (entry != NULL) {
+ entry->usageCount -= 1;
+ }
+ if (entry != NULL && entry->usageCount <= 0) {
+ //remove entry
+ hashMap_remove(receiver->subscribers.map, (void*)bndId);
+ int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+ if (rc != 0) {
+ L_ERROR("[PSA_WEBSOCKET] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+ }
+ hash_map_iterator_t iter = hashMapIterator_construct(entry->metrics);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_t *origins = hashMapIterator_nextValue(&iter);
+ hashMap_destroy(origins, true, true);
+ }
+ hashMap_destroy(entry->metrics, false, false);
+ free(entry);
+ }
+ celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
+
+static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_t *receiver, psa_websocket_subscriber_entry_t* entry, const pubsub_websocket_msg_header_t *hdr, const void* payload, size_t payloadSize, struct timespec *receiveTime) {
+ //NOTE receiver->subscribers.mutex locked
+ pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type));
+ pubsub_subscriber_t *svc = entry->svc;
+ bool monitor = receiver->metricsEnabled;
+
+ //monitoring
+ struct timespec beginSer;
+ struct timespec endSer;
+ int updateReceiveCount = 0;
+ int updateSerError = 0;
+
+ if (msgSer!= NULL) {
+ void *deserializedMsg = NULL;
+ bool validVersion = psa_websocket_checkVersion(msgSer->msgVersion, hdr);
+ if (validVersion) {
+ if (monitor) {
+ clock_gettime(CLOCK_REALTIME, &beginSer);
+ }
+ celix_status_t status = msgSer->deserialize(msgSer->handle, payload, payloadSize, &deserializedMsg);
+ if (monitor) {
+ clock_gettime(CLOCK_REALTIME, &endSer);
+ }
+ if (status == CELIX_SUCCESS) {
+ bool release = true;
+ svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, &release);
+ if (release) {
+ msgSer->freeMsg(msgSer->handle, deserializedMsg);
+ }
+ updateReceiveCount += 1;
+ } else {
+ updateSerError += 1;
+ L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope, receiver->topic);
+ }
+ }
+ } else {
+ L_WARN("[PSA_WEBSOCKET_TR] Cannot find serializer for type id 0x%X", hdr->type);
+ }
+
+ if (msgSer != NULL && monitor) {
+ hash_map_t *origins = hashMap_get(entry->metrics, (void*)(uintptr_t )hdr->type);
+ char uuidStr[UUID_STR_LEN+1];
+ uuid_unparse(hdr->originUUID, uuidStr);
+ psa_websocket_subscriber_metrics_entry_t *metrics = hashMap_get(origins, uuidStr);
+
+ if (metrics == NULL) {
+ metrics = calloc(1, sizeof(*metrics));
+ hashMap_put(origins, strndup(uuidStr, UUID_STR_LEN+1), metrics);
+ uuid_copy(metrics->origin, hdr->originUUID);
+ metrics->msgTypeId = hdr->type;
+ metrics->maxDelayInSeconds = -INFINITY;
+ metrics->minDelayInSeconds = INFINITY;
+ metrics->lastSeqNr = 0;
+ }
+
+ double diff = celix_difftime(&beginSer, &endSer);
+ long n = metrics->nrOfMessagesReceived;
+ metrics->averageSerializationTimeInSeconds = (metrics->averageSerializationTimeInSeconds * n + diff) / (n+1);
+
+ diff = celix_difftime(&metrics->lastMessageReceived, receiveTime);
+ n = metrics->nrOfMessagesReceived;
+ if (metrics->nrOfMessagesReceived >= 1) {
+ metrics->averageTimeBetweenMessagesInSeconds = (metrics->averageTimeBetweenMessagesInSeconds * n + diff) / (n + 1);
+ }
+ metrics->lastMessageReceived = *receiveTime;
+
+
+ int incr = hdr->seqNr - metrics->lastSeqNr;
+ if (metrics->lastSeqNr >0 && incr > 1) {
+ metrics->nrOfMissingSeqNumbers += (incr - 1);
+ L_WARN("Missing message seq nr went from %i to %i", metrics->lastSeqNr, hdr->seqNr);
+ }
+ metrics->lastSeqNr = hdr->seqNr;
+
+ struct timespec sendTime;
+ sendTime.tv_sec = (time_t)hdr->sendtimeSeconds;
+ sendTime.tv_nsec = (long)hdr->sendTimeNanoseconds; //TODO FIXME the tv_nsec is not correct
+ diff = celix_difftime(&sendTime, receiveTime);
+ metrics->averageDelayInSeconds = (metrics->averageDelayInSeconds * n + diff) / (n+1);
+ if (diff < metrics->minDelayInSeconds) {
+ metrics->minDelayInSeconds = diff;
+ }
+ if (diff > metrics->maxDelayInSeconds) {
+ metrics->maxDelayInSeconds = diff;
+ }
+
+ metrics->nrOfMessagesReceived += updateReceiveCount;
+ metrics->nrOfSerializationErrors += updateSerError;
+ }
+}
+
+static inline void processMsg(pubsub_websocket_topic_receiver_t *receiver, const pubsub_websocket_msg_header_t *hdr, const char *payload, size_t payloadSize, struct timespec *receiveTime) {
+ celixThreadMutex_lock(&receiver->subscribers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (entry != NULL) {
+ processMsgForSubscriberEntry(receiver, entry, hdr, payload, payloadSize, receiveTime);
+ }
+ }
+ celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
+
+static void* psa_websocket_recvThread(void * data) {
+ pubsub_websocket_topic_receiver_t *receiver = data;
+
+ celixThreadMutex_lock(&receiver->recvThread.mutex);
+ bool running = receiver->recvThread.running;
+ celixThreadMutex_unlock(&receiver->recvThread.mutex);
+
+ celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+ bool allConnected = receiver->requestedConnections.allConnected;
+ celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+ celixThreadMutex_lock(&receiver->subscribers.mutex);
+ bool allInitialized = receiver->subscribers.allInitialized;
+ celixThreadMutex_unlock(&receiver->subscribers.mutex);
+
+
+ while (running) {
+ if (!allConnected) {
+ psa_websocket_connectToAllRequestedConnections(receiver);
+ }
+ if (!allInitialized) {
+ psa_websocket_initializeAllSubscribers(receiver);
+ }
+
+ while(celix_arrayList_size(receiver->recvBuffer.list) > 0) {
+ celixThreadMutex_lock(&receiver->recvBuffer.mutex);
+ pubsub_websocket_msg_t *msg = (pubsub_websocket_msg_t *) celix_arrayList_get(receiver->recvBuffer.list, 0);
+ struct timespec *rcvTime = (struct timespec *) celix_arrayList_get(receiver->recvBuffer.rcvTimes, 0);
+ celixThreadMutex_unlock(&receiver->recvBuffer.mutex);
+
+ processMsg(receiver, &msg->header, msg->payload, msg->payloadSize, rcvTime);
+ free(msg);
+ free(rcvTime);
+
+ celixThreadMutex_lock(&receiver->recvBuffer.mutex);
+ celix_arrayList_removeAt(receiver->recvBuffer.list, 0);
+ celix_arrayList_removeAt(receiver->recvBuffer.rcvTimes, 0);
+ celixThreadMutex_unlock(&receiver->recvBuffer.mutex);
+ }
+
+ celixThreadMutex_lock(&receiver->recvThread.mutex);
+ running = receiver->recvThread.running;
+ celixThreadMutex_unlock(&receiver->recvThread.mutex);
+
+ celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+ allConnected = receiver->requestedConnections.allConnected;
+ celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+ celixThreadMutex_lock(&receiver->subscribers.mutex);
+ allInitialized = receiver->subscribers.allInitialized;
+ celixThreadMutex_unlock(&receiver->subscribers.mutex);
+ } // while
+
+ return NULL;
+}
+
+static int psa_websocketTopicReceiver_data(struct mg_connection *connection __attribute__((unused)),
+ int op_code __attribute__((unused)),
+ char *data,
+ size_t length,
+ void *handle) {
+ //Received a websocket message, append this message to the buffer of the receiver.
+ if (handle != NULL) {
+ psa_websocket_requested_connection_entry_t *entry = (psa_websocket_requested_connection_entry_t *) handle;
+
+ pubsub_websocket_msg_t *rcvdMsg = malloc(length);
+ memcpy(rcvdMsg, data, length);
+
+ //Check if payload is completely received
+ unsigned long rcvdPayloadSize = length - sizeof(rcvdMsg->header) - sizeof(rcvdMsg->payloadSize);
+ if(rcvdMsg->payloadSize == rcvdPayloadSize) {
+ celixThreadMutex_lock(&entry->recvBuffer->mutex);
+ celix_arrayList_add(entry->recvBuffer->list, rcvdMsg);
+ struct timespec *receiveTime = malloc(sizeof(*receiveTime));
+ clock_gettime(CLOCK_REALTIME, receiveTime);
+ celix_arrayList_add(entry->recvBuffer->rcvTimes, receiveTime);
+ celixThreadMutex_unlock(&entry->recvBuffer->mutex);
+ } else {
+ free(rcvdMsg);
+ }
+ }
+
+ return 1; //keep open (non-zero), 0 to close the socket
+}
+
+static void psa_websocketTopicReceiver_close(const struct mg_connection *connection __attribute__((unused)), void *handle) {
+ //Reset connection for this receiver entry
+ if (handle != NULL) {
+ psa_websocket_requested_connection_entry_t *entry = (psa_websocket_requested_connection_entry_t *) handle;
+ entry->connected = false;
+ entry->sockConnection = NULL;
+ }
+}
+
+pubsub_admin_receiver_metrics_t* pubsub_websocketTopicReceiver_metrics(pubsub_websocket_topic_receiver_t *receiver) {
+ pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
+ snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope);
+ snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic);
+
+ size_t msgTypesCount = 0;
+ celixThreadMutex_lock(&receiver->subscribers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+ hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
+ while (hashMapIterator_hasNext(&iter2)) {
+ hashMapIterator_nextValue(&iter2);
+ msgTypesCount += 1;
+ }
+ }
+
+ result->nrOfMsgTypes = (unsigned long)msgTypesCount;
+ result->msgTypes = calloc(msgTypesCount, sizeof(*result->msgTypes));
+ int i = 0;
+ iter = hashMapIterator_construct(receiver->subscribers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+ hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
+ while (hashMapIterator_hasNext(&iter2)) {
+ hash_map_t *origins = hashMapIterator_nextValue(&iter2);
+ result->msgTypes[i].origins = calloc((size_t)hashMap_size(origins), sizeof(*(result->msgTypes[i].origins)));
+ result->msgTypes[i].nrOfOrigins = hashMap_size(origins);
+ int k = 0;
+ hash_map_iterator_t iter3 = hashMapIterator_construct(origins);
+ while (hashMapIterator_hasNext(&iter3)) {
+ psa_websocket_subscriber_metrics_entry_t *metrics = hashMapIterator_nextValue(&iter3);
+ result->msgTypes[i].typeId = metrics->msgTypeId;
+ pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes, (void*)(uintptr_t)metrics->msgTypeId);
+ if (msgSer) {
+ snprintf(result->msgTypes[i].typeFqn, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", msgSer->msgName);
+ uuid_copy(result->msgTypes[i].origins[k].originUUID, metrics->origin);
+ result->msgTypes[i].origins[k].nrOfMessagesReceived = metrics->nrOfMessagesReceived;
+ result->msgTypes[i].origins[k].nrOfSerializationErrors = metrics->nrOfSerializationErrors;
+ result->msgTypes[i].origins[k].averageDelayInSeconds = metrics->averageDelayInSeconds;
+ result->msgTypes[i].origins[k].maxDelayInSeconds = metrics->maxDelayInSeconds;
+ result->msgTypes[i].origins[k].minDelayInSeconds = metrics->minDelayInSeconds;
+ result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds = metrics->averageTimeBetweenMessagesInSeconds;
+ result->msgTypes[i].origins[k].averageSerializationTimeInSeconds = metrics->averageSerializationTimeInSeconds;
+ result->msgTypes[i].origins[k].lastMessageReceived = metrics->lastMessageReceived;
+ result->msgTypes[i].origins[k].nrOfMissingSeqNumbers = metrics->nrOfMissingSeqNumbers;
+
+ k += 1;
+ } else {
+ L_WARN("[PSA_WEBSOCKET]: Error cannot find key 0x%X in msg map during metrics collection!\n", metrics->msgTypeId);
+ }
+ }
+ i +=1 ;
+ }
+ }
+
+ celixThreadMutex_unlock(&receiver->subscribers.mutex);
+
+ return result;
+}
+
+
+static void psa_websocket_connectToAllRequestedConnections(pubsub_websocket_topic_receiver_t *receiver) {
+ celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+ if (!receiver->requestedConnections.allConnected) {
+ bool allConnected = true;
+ hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_websocket_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (!entry->connected) {
+ char errBuf[100] = {0};
+ entry->sockConnection = mg_connect_websocket_client(entry->socketAddress,
+ (int) entry->socketPort,
+ 0, // No ssl
+ errBuf,
+ (size_t) sizeof(errBuf),
+ entry->uri,
+ NULL,
+ psa_websocketTopicReceiver_data,
+ psa_websocketTopicReceiver_close,
+ entry);
+ if(entry->sockConnection != NULL) {
+ entry->connected = true;
+ entry->connectRetryCount = 0;
+ } else {
+ entry->connectRetryCount += 1;
+ allConnected = false;
+ if((entry->connectRetryCount % 10) == 0) {
+ L_WARN("[PSA_WEBSOCKET] Error connecting to websocket %s:%li/%s. Error: %s",
+ entry->socketAddress,
+ entry->socketPort,
+ entry->uri, errBuf);
+ }
+ }
+ }
+ }
+ receiver->requestedConnections.allConnected = allConnected;
+ }
+ celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+static void psa_websocket_initializeAllSubscribers(pubsub_websocket_topic_receiver_t *receiver) {
+ celixThreadMutex_lock(&receiver->subscribers.mutex);
+ if (!receiver->subscribers.allInitialized) {
+ bool allInitialized = true;
+ hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (!entry->initialized) {
+ int rc = 0;
+ if (entry->svc != NULL && entry->svc->init != NULL) {
+ rc = entry->svc->init(entry->svc->handle);
+ }
+ if (rc == 0) {
+ entry->initialized = true;
+ } else {
+ L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+ allInitialized = false;
+ }
+ }
+ }
+ receiver->subscribers.allInitialized = allInitialized;
+ }
+ celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h
new file mode 100644
index 0000000..1a00cfa
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef CELIX_PUBSUB_WEBSOCKET_TOPIC_RECEIVER_H
+#define CELIX_PUBSUB_WEBSOCKET_TOPIC_RECEIVER_H
+
+#include <pubsub_admin_metrics.h>
+#include "celix_bundle_context.h"
+
+typedef struct pubsub_websocket_topic_receiver pubsub_websocket_topic_receiver_t;
+
+pubsub_websocket_topic_receiver_t* pubsub_websocketTopicReceiver_create(celix_bundle_context_t *ctx,
+ log_helper_t *logHelper,
+ const char *scope,
+ const char *topic,
+ const celix_properties_t *topicProperties,
+ long serializerSvcId,
+ pubsub_serializer_service_t *serializer);
+void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *receiver);
+
+const char* pubsub_websocketTopicReceiver_scope(pubsub_websocket_topic_receiver_t *receiver);
+const char* pubsub_websocketTopicReceiver_topic(pubsub_websocket_topic_receiver_t *receiver);
+
+long pubsub_websocketTopicReceiver_serializerSvcId(pubsub_websocket_topic_receiver_t *receiver);
+void pubsub_websocketTopicReceiver_listConnections(pubsub_websocket_topic_receiver_t *receiver, celix_array_list_t *connectedUrls, celix_array_list_t *unconnectedUrls);
+
+void pubsub_websocketTopicReceiver_connectTo(pubsub_websocket_topic_receiver_t *receiver, const char *socketAddress, long socketPort, const char *uri);
+void pubsub_websocketTopicReceiver_disconnectFrom(pubsub_websocket_topic_receiver_t *receiver, const char *uri);
+
+
+pubsub_admin_receiver_metrics_t* pubsub_websocketTopicReceiver_metrics(pubsub_websocket_topic_receiver_t *receiver);
+
+
+#endif //CELIX_PUBSUB_WEBSOCKET_TOPIC_RECEIVER_H
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
new file mode 100644
index 0000000..42f66c7
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
@@ -0,0 +1,477 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pubsub_serializer.h>
+#include <stdlib.h>
+#include <memory.h>
+#include <pubsub_constants.h>
+#include <pubsub/publisher.h>
+#include <utils.h>
+#include <zconf.h>
+#include <log_helper.h>
+#include "pubsub_websocket_topic_sender.h"
+#include "pubsub_psa_websocket_constants.h"
+#include "pubsub_websocket_common.h"
+#include <uuid/uuid.h>
+#include <constants.h>
+#include "http_admin/api.h"
+#include "civetweb.h"
+
+#define FIRST_SEND_DELAY_IN_SECONDS 2
+
+#define L_DEBUG(...) \
+ logHelper_log(sender->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+ logHelper_log(sender->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+ logHelper_log(sender->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+ logHelper_log(sender->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
+struct pubsub_websocket_topic_sender {
+ celix_bundle_context_t *ctx;
+ log_helper_t *logHelper;
+ long serializerSvcId;
+ pubsub_serializer_service_t *serializer;
+ uuid_t fwUUID;
+ bool metricsEnabled;
+
+ char *scope;
+ char *topic;
+ char scopeAndTopicFilter[5];
+ char *uri;
+
+ celix_websocket_service_t websockSvc;
+ long websockSvcId;
+ struct mg_connection *sockConnection;
+
+ struct {
+ long svcId;
+ celix_service_factory_t factory;
+ } publisher;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = bndId, value = psa_websocket_bounded_service_entry_t
+ } boundedServices;
+};
+
+typedef struct psa_websocket_send_msg_entry {
+ pubsub_websocket_msg_header_t header; //partially filled header (only seqnr and time needs to be updated per send)
+ pubsub_msg_serializer_t *msgSer;
+ celix_thread_mutex_t sendLock; //protects send & Seqnr
+ unsigned int seqNr;
+ struct {
+ celix_thread_mutex_t mutex; //protects entries in struct
+ unsigned long nrOfMessagesSend;
+ unsigned long nrOfMessagesSendFailed;
+ unsigned long nrOfSerializationErrors;
+ struct timespec lastMessageSend;
+ double averageTimeBetweenMessagesInSeconds;
+ double averageSerializationTimeInSeconds;
+ } metrics;
+} psa_websocket_send_msg_entry_t;
+
+typedef struct psa_websocket_bounded_service_entry {
+ pubsub_websocket_topic_sender_t *parent;
+ pubsub_publisher_t service;
+ long bndId;
+ hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t
+ hash_map_t *msgEntries; //key = msg type id, value = psa_websocket_send_msg_entry_t
+ int getCount;
+} psa_websocket_bounded_service_entry_t;
+
+
+static void* psa_websocket_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
+static void psa_websocket_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
+static void delay_first_send_for_late_joiners(pubsub_websocket_topic_sender_t *sender);
+
+static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg);
+
+static void psa_websocketTopicSender_ready(struct mg_connection *connection, void *handle);
+static void psa_websocketTopicSender_close(const struct mg_connection *connection, void *handle);
+
+pubsub_websocket_topic_sender_t* pubsub_websocketTopicSender_create(
+ celix_bundle_context_t *ctx,
+ log_helper_t *logHelper,
+ const char *scope,
+ const char *topic,
+ long serializerSvcId,
+ pubsub_serializer_service_t *ser) {
+ pubsub_websocket_topic_sender_t *sender = calloc(1, sizeof(*sender));
+ sender->ctx = ctx;
+ sender->logHelper = logHelper;
+ sender->serializerSvcId = serializerSvcId;
+ sender->serializer = ser;
+ psa_websocket_setScopeAndTopicFilter(scope, topic, sender->scopeAndTopicFilter);
+ const char* uuid = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
+ if (uuid != NULL) {
+ uuid_parse(uuid, sender->fwUUID);
+ }
+ sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_WEBSOCKET_METRICS_ENABLED, PSA_WEBSOCKET_DEFAULT_METRICS_ENABLED);
+
+ sender->uri = psa_websocket_createURI(scope, topic);
+
+ if (sender->uri != NULL) {
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, WEBSOCKET_ADMIN_URI, sender->uri);
+
+ sender->websockSvc.handle = sender;
+ sender->websockSvc.ready = psa_websocketTopicSender_ready;
+ sender->websockSvc.close = psa_websocketTopicSender_close;
+ sender->websockSvcId = celix_bundleContext_registerService(ctx, &sender->websockSvc,
+ WEBSOCKET_ADMIN_SERVICE_NAME, props);
+ } else {
+ sender->websockSvcId = -1;
+ }
+
+ if (sender->websockSvcId > 0) {
+ sender->scope = strndup(scope, 1024 * 1024);
+ sender->topic = strndup(topic, 1024 * 1024);
+
+ celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
+ sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
+ }
+
+ //register publisher services using a service factory
+ if (sender->websockSvcId > 0) {
+ sender->publisher.factory.handle = sender;
+ sender->publisher.factory.getService = psa_websocket_getPublisherService;
+ sender->publisher.factory.ungetService = psa_websocket_ungetPublisherService;
+
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic);
+ celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope);
+
+ celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
+ opts.factory = &sender->publisher.factory;
+ opts.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME;
+ opts.serviceVersion = PUBSUB_PUBLISHER_SERVICE_VERSION;
+ opts.properties = props;
+
+ sender->publisher.svcId = celix_bundleContext_registerServiceWithOptions(ctx, &opts);
+ }
+
+ if (sender->websockSvcId < 0) {
+ free(sender);
+ sender = NULL;
+ }
+
+ return sender;
+}
+
+void pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender) {
+ if (sender != NULL) {
+ celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId);
+
+ celixThreadMutex_lock(&sender->boundedServices.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_websocket_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (entry != NULL) {
+ sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes);
+
+ hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
+ while (hashMapIterator_hasNext(&iter2)) {
+ psa_websocket_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter2);
+ celixThreadMutex_destroy(&msgEntry->metrics.mutex);
+ free(msgEntry);
+
+ }
+ hashMap_destroy(entry->msgEntries, false, false);
+
+ free(entry);
+ }
+ }
+ hashMap_destroy(sender->boundedServices.map, false, false);
+ celixThreadMutex_unlock(&sender->boundedServices.mutex);
+
+ celixThreadMutex_destroy(&sender->boundedServices.mutex);
+
+ celix_bundleContext_unregisterService(sender->ctx, sender->websockSvcId);
+
+ free(sender->scope);
+ free(sender->topic);
+ free(sender->uri);
+ free(sender);
+ }
+}
+
+long pubsub_websocketTopicSender_serializerSvcId(pubsub_websocket_topic_sender_t *sender) {
+ return sender->serializerSvcId;
+}
+
+const char* pubsub_websocketTopicSender_scope(pubsub_websocket_topic_sender_t *sender) {
+ return sender->scope;
+}
+
+const char* pubsub_websocketTopicSender_topic(pubsub_websocket_topic_sender_t *sender) {
+ return sender->topic;
+}
+
+const char* pubsub_websocketTopicSender_url(pubsub_websocket_topic_sender_t *sender) {
+ return sender->uri;
+}
+
+
+static void* psa_websocket_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) {
+ pubsub_websocket_topic_sender_t *sender = handle;
+ long bndId = celix_bundle_getId(requestingBundle);
+
+ celixThreadMutex_lock(&sender->boundedServices.mutex);
+ psa_websocket_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void*)bndId);
+ if (entry != NULL) {
+ entry->getCount += 1;
+ } else {
+ entry = calloc(1, sizeof(*entry));
+ entry->getCount = 1;
+ entry->parent = sender;
+ entry->bndId = bndId;
+ entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
+
+ int rc = sender->serializer->createSerializerMap(sender->serializer->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
+ if (rc == 0) {
+ hash_map_iterator_t iter = hashMapIterator_construct(entry->msgTypes);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_t *hashMapEntry = hashMapIterator_nextEntry(&iter);
+ void *key = hashMapEntry_getKey(hashMapEntry);
+ psa_websocket_send_msg_entry_t *sendEntry = calloc(1, sizeof(*sendEntry));
+ sendEntry->msgSer = hashMapEntry_getValue(hashMapEntry);
+ sendEntry->header.type = (int32_t)sendEntry->msgSer->msgId;
+ int major;
+ int minor;
+ version_getMajor(sendEntry->msgSer->msgVersion, &major);
+ version_getMinor(sendEntry->msgSer->msgVersion, &minor);
+ sendEntry->header.major = (uint8_t)major;
+ sendEntry->header.minor = (uint8_t)minor;
+ uuid_copy(sendEntry->header.originUUID, sender->fwUUID);
+ celixThreadMutex_create(&sendEntry->metrics.mutex, NULL);
+ hashMap_put(entry->msgEntries, key, sendEntry);
+ }
+ entry->service.handle = entry;
+ entry->service.localMsgTypeIdForMsgType = psa_websocket_localMsgTypeIdForMsgType;
+ entry->service.send = psa_websocket_topicPublicationSend;
+ hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
+ } else {
+ L_ERROR("Error creating serializer map for websocket TopicSender %s/%s", sender->scope, sender->topic);
+ }
+ }
+ celixThreadMutex_unlock(&sender->boundedServices.mutex);
+
+ return &entry->service;
+}
+
+static void psa_websocket_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) {
+ pubsub_websocket_topic_sender_t *sender = handle;
+ long bndId = celix_bundle_getId(requestingBundle);
+
+ celixThreadMutex_lock(&sender->boundedServices.mutex);
+ psa_websocket_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void*)bndId);
+ if (entry != NULL) {
+ entry->getCount -= 1;
+ }
+ if (entry != NULL && entry->getCount == 0) {
+ //free entry
+ hashMap_remove(sender->boundedServices.map, (void*)bndId);
+ int rc = sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes);
+ if (rc != 0) {
+ L_ERROR("Error destroying publisher service, serializer not available / cannot get msg serializer map\n");
+ }
+
+ hash_map_iterator_t iter = hashMapIterator_construct(entry->msgEntries);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_websocket_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter);
+ celixThreadMutex_destroy(&msgEntry->metrics.mutex);
+ free(msgEntry);
+ }
+ hashMap_destroy(entry->msgEntries, false, false);
+
+ free(entry);
+ }
+ celixThreadMutex_unlock(&sender->boundedServices.mutex);
+}
+
+pubsub_admin_sender_metrics_t* pubsub_websocketTopicSender_metrics(pubsub_websocket_topic_sender_t *sender) {
+ pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result));
+ snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope);
+ snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->topic);
+ celixThreadMutex_lock(&sender->boundedServices.mutex);
+ size_t count = 0;
+ hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_websocket_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
+ hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
+ while (hashMapIterator_hasNext(&iter2)) {
+ hashMapIterator_nextValue(&iter2);
+ count += 1;
+ }
+ }
+
+ result->msgMetrics = calloc(count, sizeof(*result));
+
+ iter = hashMapIterator_construct(sender->boundedServices.map);
+ int i = 0;
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_websocket_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
+ hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
+ while (hashMapIterator_hasNext(&iter2)) {
+ psa_websocket_send_msg_entry_t *mEntry = hashMapIterator_nextValue(&iter2);
+ celixThreadMutex_lock(&mEntry->metrics.mutex);
+ result->msgMetrics[i].nrOfMessagesSend = mEntry->metrics.nrOfMessagesSend;
+ result->msgMetrics[i].nrOfMessagesSendFailed = mEntry->metrics.nrOfMessagesSendFailed;
+ result->msgMetrics[i].nrOfSerializationErrors = mEntry->metrics.nrOfSerializationErrors;
+ result->msgMetrics[i].averageSerializationTimeInSeconds = mEntry->metrics.averageSerializationTimeInSeconds;
+ result->msgMetrics[i].averageTimeBetweenMessagesInSeconds = mEntry->metrics.averageTimeBetweenMessagesInSeconds;
+ result->msgMetrics[i].lastMessageSend = mEntry->metrics.lastMessageSend;
+ result->msgMetrics[i].bndId = entry->bndId;
+ result->msgMetrics[i].typeId = mEntry->header.type;
+ snprintf(result->msgMetrics[i].typeFqn, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", mEntry->msgSer->msgName);
+ i += 1;
+ celixThreadMutex_unlock(&mEntry->metrics.mutex);
+ }
+ }
+
+ celixThreadMutex_unlock(&sender->boundedServices.mutex);
+ result->nrOfmsgMetrics = (int)count;
+ return result;
+}
+
+static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg) {
+ int status = CELIX_SERVICE_EXCEPTION;
+ psa_websocket_bounded_service_entry_t *bound = handle;
+ pubsub_websocket_topic_sender_t *sender = bound->parent;
+ bool monitor = sender->metricsEnabled;
+ psa_websocket_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void *) (uintptr_t) (msgTypeId));
+
+ //metrics updates
+ struct timespec sendTime;
+ struct timespec serializationStart;
+ struct timespec serializationEnd;
+ //int unknownMessageCountUpdate = 0;
+ int sendErrorUpdate = 0;
+ int serializationErrorUpdate = 0;
+ int sendCountUpdate = 0;
+
+ if (sender->sockConnection != NULL && entry != NULL) {
+ delay_first_send_for_late_joiners(sender);
+
+ if (monitor) {
+ clock_gettime(CLOCK_REALTIME, &serializationStart);
+ }
+
+ void *serializedOutput = NULL;
+ size_t serializedOutputLen = 0;
+ status = entry->msgSer->serialize(entry->msgSer->handle, inMsg, &serializedOutput, &serializedOutputLen);
+
+ if (monitor) {
+ clock_gettime(CLOCK_REALTIME, &serializationEnd);
+ }
+
+ if (status == CELIX_SUCCESS /*ser ok*/) {
+ unsigned char *hdrEncoded = calloc(sizeof(pubsub_websocket_msg_header_t), sizeof(unsigned char));
+
+ celixThreadMutex_lock(&entry->sendLock);
+
+ pubsub_websocket_msg_t *msg = malloc(sizeof(*msg) + sizeof(char[serializedOutputLen]));
+ pubsub_websocket_msg_header_t *msgHdr = &entry->header;
+ if (monitor) {
+ clock_gettime(CLOCK_REALTIME, &sendTime);
+ msgHdr->sendtimeSeconds = (uint64_t) sendTime.tv_sec;
+ msgHdr->sendTimeNanoseconds = (uint64_t) sendTime.tv_nsec;
+ msgHdr->seqNr++;
+ }
+ memcpy(&msg->header, msgHdr, sizeof(pubsub_websocket_msg_header_t));
+
+ msg->payloadSize = (unsigned int) serializedOutputLen;
+ size_t hdr_size = sizeof(msg->header);
+ size_t ps_size = sizeof(msg->payloadSize);
+ size_t bytes_to_write = hdr_size + ps_size + serializedOutputLen;//sizeof(*msg);
+ memcpy(msg->payload, serializedOutput, serializedOutputLen);
+ int bytes_written = mg_websocket_client_write(sender->sockConnection, MG_WEBSOCKET_OPCODE_TEXT, (char *) msg, bytes_to_write);
+
+ celixThreadMutex_unlock(&entry->sendLock);
+ if (bytes_written == (int) bytes_to_write) {
+ sendCountUpdate = 1;
+ } else {
+ sendErrorUpdate = 1;
+ L_WARN("[PSA_WEBSOCKET_TS] Error sending websocket.");
+ }
+
+ free(msg);
+ free(hdrEncoded);
+ free(serializedOutput);
+ } else {
+ serializationErrorUpdate = 1;
+ L_WARN("[PSA_WEBSOCKET_TS] Error serialize message of type %s for scope/topic %s/%s",
+ entry->msgSer->msgName, sender->scope, sender->topic);
+ }
+ } else if (entry == NULL){
+ //unknownMessageCountUpdate = 1;
+ L_WARN("[PSA_WEBSOCKET_TS] Error sending message with msg type id %i for scope/topic %s/%s", msgTypeId, sender->scope, sender->topic);
+ }
+
+
+ if (monitor && status == CELIX_SUCCESS) {
+ celixThreadMutex_lock(&entry->metrics.mutex);
+
+ long n = entry->metrics.nrOfMessagesSend + entry->metrics.nrOfMessagesSendFailed;
+ double diff = celix_difftime(&serializationStart, &serializationEnd);
+ double average = (entry->metrics.averageSerializationTimeInSeconds * n + diff) / (n+1);
+ entry->metrics.averageSerializationTimeInSeconds = average;
+
+ if (entry->metrics.nrOfMessagesSend > 2) {
+ diff = celix_difftime(&entry->metrics.lastMessageSend, &sendTime);
+ n = entry->metrics.nrOfMessagesSend;
+ average = (entry->metrics.averageTimeBetweenMessagesInSeconds * n + diff) / (n+1);
+ entry->metrics.averageTimeBetweenMessagesInSeconds = average;
+ }
+
+ entry->metrics.lastMessageSend = sendTime;
+ entry->metrics.nrOfMessagesSend += sendCountUpdate;
+ entry->metrics.nrOfMessagesSendFailed += sendErrorUpdate;
+ entry->metrics.nrOfSerializationErrors += serializationErrorUpdate;
+
+ celixThreadMutex_unlock(&entry->metrics.mutex);
+ }
+
+ return status;
+}
+
+static void psa_websocketTopicSender_ready(struct mg_connection *connection, void *handle) {
+ //Connection succeeded so save connection to use for sending the messages
+ pubsub_websocket_topic_sender_t *sender = (pubsub_websocket_topic_sender_t *) handle;
+ sender->sockConnection = connection;
+}
+
+static void psa_websocketTopicSender_close(const struct mg_connection *connection __attribute__((unused)), void *handle) {
+ //Connection closed so reset connection
+ pubsub_websocket_topic_sender_t *sender = (pubsub_websocket_topic_sender_t *) handle;
+ sender->sockConnection = NULL;
+}
+
+static void delay_first_send_for_late_joiners(pubsub_websocket_topic_sender_t *sender) {
+
+ static bool firstSend = true;
+
+ if (firstSend) {
+ L_INFO("PSA_WEBSOCKET_TP: Delaying first send for late joiners...\n");
+ sleep(FIRST_SEND_DELAY_IN_SECONDS);
+ firstSend = false;
+ }
+}
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.h b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.h
new file mode 100644
index 0000000..35229a8
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.h
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef CELIX_PUBSUB_WEBSOCKET_TOPIC_SENDER_H
+#define CELIX_PUBSUB_WEBSOCKET_TOPIC_SENDER_H
+
+#include "celix_bundle_context.h"
+#include "pubsub_admin_metrics.h"
+
+typedef struct pubsub_websocket_topic_sender pubsub_websocket_topic_sender_t;
+
+pubsub_websocket_topic_sender_t* pubsub_websocketTopicSender_create(
+ celix_bundle_context_t *ctx,
+ log_helper_t *logHelper,
+ const char *scope,
+ const char *topic,
+ long serializerSvcId,
+ pubsub_serializer_service_t *ser);
+void pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender);
+
+const char* pubsub_websocketTopicSender_scope(pubsub_websocket_topic_sender_t *sender);
+const char* pubsub_websocketTopicSender_topic(pubsub_websocket_topic_sender_t *sender);
+const char* pubsub_websocketTopicSender_url(pubsub_websocket_topic_sender_t *sender);
+
+long pubsub_websocketTopicSender_serializerSvcId(pubsub_websocket_topic_sender_t *sender);
+
+/**
+ * Returns a array of pubsub_admin_sender_msg_type_metrics_t entries for every msg_type/bundle send with the topic sender.
+ */
+pubsub_admin_sender_metrics_t* pubsub_websocketTopicSender_metrics(pubsub_websocket_topic_sender_t *sender);
+
+#endif //CELIX_PUBSUB_WEBSOCKET_TOPIC_SENDER_H
diff --git a/bundles/pubsub/test/CMakeLists.txt b/bundles/pubsub/test/CMakeLists.txt
index d37eb23..957745b 100644
--- a/bundles/pubsub/test/CMakeLists.txt
+++ b/bundles/pubsub/test/CMakeLists.txt
@@ -172,6 +172,27 @@ add_test(NAME pubsub_tcp_endpoint_tests COMMAND pubsub_tcp_endpoint_tests WORKIN
SETUP_TARGET_FOR_COVERAGE(pubsub_tcp_endpoint_tests_cov pubsub_tcp_endpoint_tests ${CMAKE_BINARY_DIR}/coverage/pubsub_tcp_endpoint_tests/pubsub_tcp_endpoint_tests ..)
endif()
+add_celix_container(pubsub_websocket_tests
+ USE_CONFIG
+ LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+ DIR ${CMAKE_CURRENT_BINARY_DIR}
+ PROPERTIES
+ LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
+ USE_WEBSOCKETS=true
+ LISTENING_PORTS=8080
+ BUNDLES
+ Celix::pubsub_serializer_json
+ Celix::http_admin
+ Celix::pubsub_topology_manager
+ Celix::pubsub_admin_websocket
+ pubsub_sut
+ pubsub_tst
+)
+target_link_libraries(pubsub_websocket_tests PRIVATE Celix::pubsub_api ${CPPUTEST_LIBRARIES} ${JANSSON_LIBRARIES} Celix::dfi)
+target_include_directories(pubsub_websocket_tests PRIVATE ${CPPUTEST_INCLUDE_DIR})
+add_test(NAME pubsub_websocket_tests COMMAND pubsub_websocket_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_websocket_tests,CONTAINER_LOC>)
+SETUP_TARGET_FOR_COVERAGE(pubsub_websocket_tests_cov pubsub_websocket_tests ${CMAKE_BINARY_DIR}/coverage/pubsub_websocket_tests/pubsub_websocket_tests ..)
+
if (BUILD_PUBSUB_PSA_ZMQ)
add_celix_container(pubsub_zmq_tests
USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
diff --git a/bundles/pubsub/test/meta_data/ping.properties b/bundles/pubsub/test/meta_data/ping.properties
index b73435d..2e13c17 100644
--- a/bundles/pubsub/test/meta_data/ping.properties
+++ b/bundles/pubsub/test/meta_data/ping.properties
@@ -20,6 +20,7 @@ tcp.static.bind.url=tcp://127.0.0.1:9000
tcp.static.connect.urls=tcp://127.0.0.1:9000
udpmc.static.bind.port=50678
udpmc.static.connect.socket_addresses=224.100.0.1:50678
+websocket.static.connect.socket_addresses=127.0.0.1:8080
#note only effective if run as root
thread.realtime.shed=SCHED_FIFO