You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rl...@apache.org on 2019/08/23 06:17:02 UTC

[celix] branch develop updated: Added pubsub admin websocket

This is an automated email from the ASF dual-hosted git repository.

rlenferink pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/develop by this push:
     new de7c3a1  Added pubsub admin websocket
     new 41d1ba9  Merge pull request #39 from dhbfischer/feature/pubsub_admin_websocket
de7c3a1 is described below

commit de7c3a19359a9908c56801dd081630400729f288
Author: dfischer <ma...@daanfischer.nl>
AuthorDate: Tue Aug 6 17:26:56 2019 +0200

    Added pubsub admin websocket
---
 bundles/pubsub/CMakeLists.txt                      |   1 +
 .../pubsub/pubsub_admin_websocket/CMakeLists.txt   |  53 ++
 .../pubsub_admin_websocket/src/psa_activator.c     | 128 ++++
 .../src/pubsub_psa_websocket_constants.h           |  48 ++
 .../src/pubsub_websocket_admin.c                   | 630 +++++++++++++++++
 .../src/pubsub_websocket_admin.h                   |  54 ++
 .../src/pubsub_websocket_common.c                  |  73 ++
 .../src/pubsub_websocket_common.h                  |  55 ++
 .../src/pubsub_websocket_topic_receiver.c          | 787 +++++++++++++++++++++
 .../src/pubsub_websocket_topic_receiver.h          |  50 ++
 .../src/pubsub_websocket_topic_sender.c            | 477 +++++++++++++
 .../src/pubsub_websocket_topic_sender.h            |  48 ++
 bundles/pubsub/test/CMakeLists.txt                 |  21 +
 bundles/pubsub/test/meta_data/ping.properties      |   1 +
 14 files changed, 2426 insertions(+)

diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index 0373347..a084aa5 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -34,6 +34,7 @@ if (PUBSUB)
     add_subdirectory(pubsub_admin_tcp)
     add_subdirectory(pubsub_admin_nanomsg)
     add_subdirectory(pubsub_admin_udp_mc)
+    add_subdirectory(pubsub_admin_websocket)
     add_subdirectory(keygen)
     add_subdirectory(mock)
 
diff --git a/bundles/pubsub/pubsub_admin_websocket/CMakeLists.txt b/bundles/pubsub/pubsub_admin_websocket/CMakeLists.txt
new file mode 100644
index 0000000..c992d5e
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/CMakeLists.txt
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+find_package(Jansson REQUIRED)
+find_package(UUID REQUIRED)
+
+if(NOT UUID_LIBRARY)
+    #i.e. not found for OSX
+    set(UUID_LIBRARY "")
+    set(UUID_INCLUDE_DIRS "")
+endif()
+
+add_celix_bundle(celix_pubsub_admin_websocket
+    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_websocket"
+    VERSION "1.0.0"
+    GROUP "Celix/PubSub"
+    SOURCES
+        src/psa_activator.c
+        src/pubsub_websocket_admin.c
+        src/pubsub_websocket_topic_sender.c
+        src/pubsub_websocket_topic_receiver.c
+        src/pubsub_websocket_common.c
+)
+
+set_target_properties(celix_pubsub_admin_websocket PROPERTIES INSTALL_RPATH "$ORIGIN")
+target_link_libraries(celix_pubsub_admin_websocket PRIVATE
+        Celix::pubsub_spi
+        Celix::framework Celix::dfi Celix::log_helper Celix::utils
+        Celix::http_admin_api
+)
+target_include_directories(celix_pubsub_admin_websocket PRIVATE
+    ${JANSSON_INCLUDE_DIR}
+    ${UUID_INCLUDE_DIRS}
+    src
+)
+
+install_celix_bundle(celix_pubsub_admin_websocket EXPORT celix COMPONENT pubsub)
+target_link_libraries(celix_pubsub_admin_websocket PRIVATE Celix::shell_api)
+add_library(Celix::pubsub_admin_websocket ALIAS celix_pubsub_admin_websocket)
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/psa_activator.c b/bundles/pubsub/pubsub_admin_websocket/src/psa_activator.c
new file mode 100644
index 0000000..a1d36b0
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/psa_activator.c
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdlib.h>
+
+#include "celix_api.h"
+#include "pubsub_serializer.h"
+#include "log_helper.h"
+
+#include "pubsub_admin.h"
+#include "pubsub_admin_metrics.h"
+#include "pubsub_websocket_admin.h"
+#include "command.h"
+
+typedef struct psa_websocket_activator {
+    log_helper_t *logHelper;
+
+    pubsub_websocket_admin_t *admin;
+
+    long serializersTrackerId;
+
+    pubsub_admin_service_t adminService;
+    long adminSvcId;
+
+    pubsub_admin_metrics_service_t adminMetricsService;
+    long adminMetricsSvcId;
+
+    command_service_t cmdSvc;
+    long cmdSvcId;
+} psa_websocket_activator_t;
+
+int psa_websocket_start(psa_websocket_activator_t *act, celix_bundle_context_t *ctx) {
+    act->adminSvcId = -1L;
+    act->cmdSvcId = -1L;
+    act->serializersTrackerId = -1L;
+
+    logHelper_create(ctx, &act->logHelper);
+    logHelper_start(act->logHelper);
+
+    act->admin = pubsub_websocketAdmin_create(ctx, act->logHelper);
+    celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
+
+    //track serializers (only json)
+    if (status == CELIX_SUCCESS) {
+        celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+        opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
+        opts.filter.filter = "(pubsub.serializer=json)";
+        opts.filter.ignoreServiceLanguage = true;
+        opts.callbackHandle = act->admin;
+        opts.addWithProperties = pubsub_websocketAdmin_addSerializerSvc;
+        opts.removeWithProperties = pubsub_websocketAdmin_removeSerializerSvc;
+        act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+    }
+
+    //register pubsub admin service
+    if (status == CELIX_SUCCESS) {
+        pubsub_admin_service_t *psaSvc = &act->adminService;
+        psaSvc->handle = act->admin;
+        psaSvc->matchPublisher = pubsub_websocketAdmin_matchPublisher;
+        psaSvc->matchSubscriber = pubsub_websocketAdmin_matchSubscriber;
+        psaSvc->matchDiscoveredEndpoint = pubsub_websocketAdmin_matchDiscoveredEndpoint;
+        psaSvc->setupTopicSender = pubsub_websocketAdmin_setupTopicSender;
+        psaSvc->teardownTopicSender = pubsub_websocketAdmin_teardownTopicSender;
+        psaSvc->setupTopicReceiver = pubsub_websocketAdmin_setupTopicReceiver;
+        psaSvc->teardownTopicReceiver = pubsub_websocketAdmin_teardownTopicReceiver;
+        psaSvc->addDiscoveredEndpoint = pubsub_websocketAdmin_addDiscoveredEndpoint;
+        psaSvc->removeDiscoveredEndpoint = pubsub_websocketAdmin_removeDiscoveredEndpoint;
+
+        celix_properties_t *props = celix_properties_create();
+        celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_WEBSOCKET_ADMIN_TYPE);
+
+        act->adminSvcId = celix_bundleContext_registerService(ctx, psaSvc, PUBSUB_ADMIN_SERVICE_NAME, props);
+    }
+
+    if (status == CELIX_SUCCESS) {
+        act->adminMetricsService.handle = act->admin;
+        act->adminMetricsService.metrics = pubsub_websocketAdmin_metrics;
+
+        celix_properties_t *props = celix_properties_create();
+        celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_WEBSOCKET_ADMIN_TYPE);
+
+        act->adminMetricsSvcId = celix_bundleContext_registerService(ctx, &act->adminMetricsService, PUBSUB_ADMIN_METRICS_SERVICE_NAME, props);
+    }
+
+    //register shell command service
+    {
+        act->cmdSvc.handle = act->admin;
+        act->cmdSvc.executeCommand = pubsub_websocketAdmin_executeCommand;
+        celix_properties_t *props = celix_properties_create();
+        celix_properties_set(props, OSGI_SHELL_COMMAND_NAME, "psa_websocket");
+        celix_properties_set(props, OSGI_SHELL_COMMAND_USAGE, "psa_websocket");
+        celix_properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the websocket PSA");
+        act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props);
+    }
+
+    return status;
+}
+
+int psa_websocket_stop(psa_websocket_activator_t *act, celix_bundle_context_t *ctx) {
+    celix_bundleContext_unregisterService(ctx, act->adminSvcId);
+    celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
+    celix_bundleContext_unregisterService(ctx, act->adminMetricsSvcId);
+    celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
+    pubsub_websocketAdmin_destroy(act->admin);
+
+    logHelper_stop(act->logHelper);
+    logHelper_destroy(&act->logHelper);
+
+    return CELIX_SUCCESS;
+}
+
+CELIX_GEN_BUNDLE_ACTIVATOR(psa_websocket_activator_t, psa_websocket_start, psa_websocket_stop);
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_psa_websocket_constants.h b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_psa_websocket_constants.h
new file mode 100644
index 0000000..c6fba82
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_psa_websocket_constants.h
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef PUBSUB_PSA_WEBSOCKET_CONSTANTS_H_
+#define PUBSUB_PSA_WEBSOCKET_CONSTANTS_H_
+
+#define PSA_WEBSOCKET_DEFAULT_QOS_SAMPLE_SCORE        30
+#define PSA_WEBSOCKET_DEFAULT_QOS_CONTROL_SCORE       70
+#define PSA_WEBSOCKET_DEFAULT_SCORE                   30
+
+#define PSA_WEBSOCKET_QOS_SAMPLE_SCORE_KEY            "PSA_WEBSOCKET_QOS_SAMPLE_SCORE"
+#define PSA_WEBSOCKET_QOS_CONTROL_SCORE_KEY           "PSA_WEBSOCKET_QOS_CONTROL_SCORE"
+#define PSA_WEBSOCKET_DEFAULT_SCORE_KEY               "PSA_WEBSOCKET_DEFAULT_SCORE"
+
+
+#define PSA_WEBSOCKET_METRICS_ENABLED                 "PSA_WEBSOCKET_METRICS_ENABLED"
+#define PSA_WEBSOCKET_DEFAULT_METRICS_ENABLED         true
+
+#define PUBSUB_WEBSOCKET_VERBOSE_KEY                  "PSA_WEBSOCKET_VERBOSE"
+#define PUBSUB_WEBSOCKET_VERBOSE_DEFAULT              true
+
+#define PUBSUB_WEBSOCKET_ADMIN_TYPE                   "websocket"
+#define PUBSUB_WEBSOCKET_ADDRESS_KEY                  "websocket.socket_address"
+#define PUBSUB_WEBSOCKET_PORT_KEY                     "websocket.socket_port"
+
+/**
+ * The static url which a subscriber should try to connect to.
+ * The urls are space separated
+ */
+#define PUBSUB_WEBSOCKET_STATIC_CONNECT_SOCKET_ADDRESSES    "websocket.static.connect.socket_addresses"
+
+#endif /* PUBSUB_PSA_WEBSOCKET_CONSTANTS_H_ */
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
new file mode 100644
index 0000000..a7fbe29
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
@@ -0,0 +1,630 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <memory.h>
+#include <pubsub_endpoint.h>
+#include <pubsub_serializer.h>
+#include <ip_utils.h>
+
+#include "pubsub_utils.h"
+#include "pubsub_websocket_admin.h"
+#include "pubsub_psa_websocket_constants.h"
+#include "pubsub_websocket_topic_sender.h"
+#include "pubsub_websocket_topic_receiver.h"
+#include "pubsub_websocket_common.h"
+
+#define L_DEBUG(...) \
+    logHelper_log(psa->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+    logHelper_log(psa->log, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+    logHelper_log(psa->log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+    logHelper_log(psa->log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
+struct pubsub_websocket_admin {
+    celix_bundle_context_t *ctx;
+    log_helper_t *log;
+    const char *fwUUID;
+
+    double qosSampleScore;
+    double qosControlScore;
+    double defaultScore;
+
+    bool verbose;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = svcId, value = psa_websocket_serializer_entry_t*
+    } serializers;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = scope:topic key, value = pubsub_websocket_topic_sender_t*
+    } topicSenders;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = scope:topic key, value = pubsub_websocket_topic_sender_t*
+    } topicReceivers;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
+    } discoveredEndpoints;
+
+};
+
+typedef struct psa_websocket_serializer_entry {
+    const char *serType;
+    long svcId;
+    pubsub_serializer_service_t *svc;
+} psa_websocket_serializer_entry_t;
+
+static celix_status_t pubsub_websocketAdmin_connectEndpointToReceiver(pubsub_websocket_admin_t* psa, pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t *endpoint);
+static celix_status_t pubsub_websocketAdmin_disconnectEndpointFromReceiver(pubsub_websocket_admin_t* psa, pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t *endpoint);
+
+
+pubsub_websocket_admin_t* pubsub_websocketAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper) {
+    pubsub_websocket_admin_t *psa = calloc(1, sizeof(*psa));
+    psa->ctx = ctx;
+    psa->log = logHelper;
+    psa->verbose = celix_bundleContext_getPropertyAsBool(ctx, PUBSUB_WEBSOCKET_VERBOSE_KEY, PUBSUB_WEBSOCKET_VERBOSE_DEFAULT);
+    psa->fwUUID = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
+
+    psa->defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_WEBSOCKET_DEFAULT_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_SCORE);
+    psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_WEBSOCKET_QOS_SAMPLE_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_QOS_SAMPLE_SCORE);
+    psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_WEBSOCKET_QOS_CONTROL_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_QOS_CONTROL_SCORE);
+
+    celixThreadMutex_create(&psa->serializers.mutex, NULL);
+    psa->serializers.map = hashMap_create(NULL, NULL, NULL, NULL);
+
+    celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
+    psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+    celixThreadMutex_create(&psa->topicReceivers.mutex, NULL);
+    psa->topicReceivers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+    celixThreadMutex_create(&psa->discoveredEndpoints.mutex, NULL);
+    psa->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+    return psa;
+}
+
+void pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t *psa) {
+    if (psa == NULL) {
+        return;
+    }
+
+    //note assuming al psa register services and service tracker are removed.
+
+    celixThreadMutex_lock(&psa->topicSenders.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_websocket_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
+        pubsub_websocketTopicSender_destroy(sender);
+    }
+    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    iter = hashMapIterator_construct(psa->topicReceivers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_websocket_topic_receiver_t *recv = hashMapIterator_nextValue(&iter);
+        pubsub_websocketTopicReceiver_destroy(recv);
+    }
+    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+    celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+    iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        celix_properties_t *ep = hashMapIterator_nextValue(&iter);
+        celix_properties_destroy(ep);
+    }
+    celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    iter = hashMapIterator_construct(psa->serializers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_websocket_serializer_entry_t *entry = hashMapIterator_nextValue(&iter);
+        free(entry);
+    }
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+
+    celixThreadMutex_destroy(&psa->topicSenders.mutex);
+    hashMap_destroy(psa->topicSenders.map, true, false);
+
+    celixThreadMutex_destroy(&psa->topicReceivers.mutex);
+    hashMap_destroy(psa->topicReceivers.map, true, false);
+
+    celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
+    hashMap_destroy(psa->discoveredEndpoints.map, false, false);
+
+    celixThreadMutex_destroy(&psa->serializers.mutex);
+    hashMap_destroy(psa->serializers.map, false, false);
+
+    free(psa);
+}
+
+void pubsub_websocketAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
+    pubsub_websocket_admin_t *psa = handle;
+
+    const char *serType = celix_properties_get(props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+
+    if (serType == NULL) {
+        L_INFO("[PSA_WEBSOCKET] Ignoring serializer service without %s property", PUBSUB_SERIALIZER_TYPE_KEY);
+        return;
+    }
+
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    psa_websocket_serializer_entry_t *entry = hashMap_get(psa->serializers.map, (void*)svcId);
+    if (entry == NULL) {
+        entry = calloc(1, sizeof(*entry));
+        entry->serType = serType;
+        entry->svcId = svcId;
+        entry->svc = svc;
+        hashMap_put(psa->serializers.map, (void*)svcId, entry);
+    }
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+}
+
+void pubsub_websocketAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
+    pubsub_websocket_admin_t *psa = handle;
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+
+    //remove serializer
+    // 1) First find entry and
+    // 2) loop and destroy all topic sender using the serializer and
+    // 3) loop and destroy all topic receivers using the serializer
+    // Note that it is the responsibility of the topology manager to create new topic senders/receivers
+
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    psa_websocket_serializer_entry_t *entry = hashMap_remove(psa->serializers.map, (void*)svcId);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+
+    if (entry != NULL) {
+        celixThreadMutex_lock(&psa->topicSenders.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+            pubsub_websocket_topic_sender_t *sender = hashMapEntry_getValue(senderEntry);
+            if (sender != NULL && entry->svcId == pubsub_websocketTopicSender_serializerSvcId(sender)) {
+                char *key = hashMapEntry_getKey(senderEntry);
+                hashMapIterator_remove(&iter);
+                pubsub_websocketTopicSender_destroy(sender);
+                free(key);
+            }
+        }
+        celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+        celixThreadMutex_lock(&psa->topicReceivers.mutex);
+        iter = hashMapIterator_construct(psa->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+            pubsub_websocket_topic_receiver_t *receiver = hashMapEntry_getValue(senderEntry);
+            if (receiver != NULL && entry->svcId == pubsub_websocketTopicReceiver_serializerSvcId(receiver)) {
+                char *key = hashMapEntry_getKey(senderEntry);
+                hashMapIterator_remove(&iter);
+                pubsub_websocketTopicReceiver_destroy(receiver);
+                free(key);
+            }
+        }
+        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+        free(entry);
+    }
+}
+
+celix_status_t pubsub_websocketAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
+    pubsub_websocket_admin_t *psa = handle;
+    L_DEBUG("[PSA_WEBSOCKET] pubsub_websocketAdmin_matchPublisher");
+    celix_status_t  status = CELIX_SUCCESS;
+    double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_WEBSOCKET_ADMIN_TYPE,
+                                               psa->qosSampleScore, psa->qosControlScore, psa->defaultScore,
+                                               topicProperties, outSerializerSvcId);
+    *outScore = score;
+
+    return status;
+}
+
+celix_status_t pubsub_websocketAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
+    pubsub_websocket_admin_t *psa = handle;
+    L_DEBUG("[PSA_WEBSOCKET] pubsub_websocketAdmin_matchSubscriber");
+    celix_status_t  status = CELIX_SUCCESS;
+    double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_WEBSOCKET_ADMIN_TYPE,
+                                                psa->qosSampleScore, psa->qosControlScore, psa->defaultScore,
+                                                topicProperties, outSerializerSvcId);
+    if (outScore != NULL) {
+        *outScore = score;
+    }
+    return status;
+}
+
+celix_status_t pubsub_websocketAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *outMatch) {
+    pubsub_websocket_admin_t *psa = handle;
+    L_DEBUG("[PSA_WEBSOCKET] pubsub_websocketAdmin_matchEndpoint");
+    celix_status_t  status = CELIX_SUCCESS;
+    bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_WEBSOCKET_ADMIN_TYPE, NULL);
+    if (outMatch != NULL) {
+        *outMatch = match;
+    }
+    return status;
+}
+
+celix_status_t pubsub_websocketAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
+    pubsub_websocket_admin_t *psa = handle;
+    celix_status_t  status = CELIX_SUCCESS;
+
+    //1) Create TopicSender
+    //2) Store TopicSender
+    //3) Connect existing endpoints
+    //4) set outPublisherEndpoint
+
+    celix_properties_t *newEndpoint = NULL;
+
+    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    celixThreadMutex_lock(&psa->topicSenders.mutex);
+    pubsub_websocket_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key);
+    if (sender == NULL) {
+        psa_websocket_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
+        if (serEntry != NULL) {
+            sender = pubsub_websocketTopicSender_create(psa->ctx, psa->log, scope, topic, serializerSvcId, serEntry->svc);
+        }
+        if (sender != NULL) {
+            const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE;
+            const char *serType = serEntry->serType;
+            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
+                                                serType, NULL);
+
+            //Set endpoint visibility to local because the http server handles discovery
+            celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_LOCAL_VISIBILITY);
+
+            //if available also set container name
+            const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
+            if (cn != NULL) {
+                celix_properties_set(newEndpoint, "container_name", cn);
+            }
+            hashMap_put(psa->topicSenders.map, key, sender);
+        } else {
+            L_ERROR("[PSA_WEBSOCKET] Error creating a TopicSender");
+            free(key);
+        }
+    } else {
+        free(key);
+        L_ERROR("[PSA_WEBSOCKET] Cannot setup already existing TopicSender for scope/topic %s/%s!", scope, topic);
+    }
+    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+
+    if (newEndpoint != NULL && outPublisherEndpoint != NULL) {
+        *outPublisherEndpoint = newEndpoint;
+    }
+
+    return status;
+}
+
+celix_status_t pubsub_websocketAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic) {
+    pubsub_websocket_admin_t *psa = handle;
+    celix_status_t  status = CELIX_SUCCESS;
+
+    //1) Find and remove TopicSender from map
+    //2) destroy topic sender
+
+    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&psa->topicSenders.mutex);
+    hash_map_entry_t *entry = hashMap_getEntry(psa->topicSenders.map, key);
+    if (entry != NULL) {
+        char *mapKey = hashMapEntry_getKey(entry);
+        pubsub_websocket_topic_sender_t *sender = hashMap_remove(psa->topicSenders.map, key);
+        free(mapKey);
+        pubsub_websocketTopicSender_destroy(sender);
+    } else {
+        L_ERROR("[PSA_WEBSOCKET] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic);
+    }
+    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    free(key);
+
+    return status;
+}
+
+celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
+    pubsub_websocket_admin_t *psa = handle;
+
+    celix_properties_t *newEndpoint = NULL;
+
+    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    pubsub_websocket_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
+    if (receiver == NULL) {
+        psa_websocket_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
+        if (serEntry != NULL) {
+            receiver = pubsub_websocketTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, serializerSvcId, serEntry->svc);
+        } else {
+            L_ERROR("[PSA_WEBSOCKET] Cannot find serializer for TopicSender %s/%s", scope, topic);
+        }
+        if (receiver != NULL) {
+            const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE;
+            const char *serType = serEntry->serType;
+            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
+                                                PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL);
+
+            //Set endpoint visibility to local because the http server handles discovery
+            celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_LOCAL_VISIBILITY);
+
+            //if available also set container name
+            const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
+            if (cn != NULL) {
+                celix_properties_set(newEndpoint, "container_name", cn);
+            }
+            hashMap_put(psa->topicReceivers.map, key, receiver);
+        } else {
+            L_ERROR("[PSA_WEBSOCKET] Error creating a TopicReceiver.");
+            free(key);
+        }
+    } else {
+        free(key);
+        L_ERROR("[PSA_WEBSOCKET] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic);
+    }
+    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+
+    if (receiver != NULL && newEndpoint != NULL) {
+        celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
+            const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+            if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+                pubsub_websocketAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+            }
+        }
+        celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+    }
+
+    if (newEndpoint != NULL && outSubscriberEndpoint != NULL) {
+        *outSubscriberEndpoint = newEndpoint;
+    }
+
+    celix_status_t status = CELIX_SUCCESS;
+    return status;
+}
+
+celix_status_t pubsub_websocketAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic) {
+    pubsub_websocket_admin_t *psa = handle;
+
+    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    hash_map_entry_t *entry = hashMap_getEntry(psa->topicReceivers.map, key);
+    free(key);
+    if (entry != NULL) {
+        char *receiverKey = hashMapEntry_getKey(entry);
+        pubsub_websocket_topic_receiver_t *receiver = hashMapEntry_getValue(entry);
+        hashMap_remove(psa->topicReceivers.map, receiverKey);
+
+        free(receiverKey);
+        pubsub_websocketTopicReceiver_destroy(receiver);
+    }
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+
+    celix_status_t  status = CELIX_SUCCESS;
+    return status;
+}
+
+static celix_status_t pubsub_websocketAdmin_connectEndpointToReceiver(pubsub_websocket_admin_t* psa, pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
+    //note can be called with discoveredEndpoint.mutex lock
+    celix_status_t status = CELIX_SUCCESS;
+
+    const char *scope = pubsub_websocketTopicReceiver_scope(receiver);
+    const char *topic = pubsub_websocketTopicReceiver_topic(receiver);
+
+    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+    const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+    const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+    const char *sockAddress = celix_properties_get(endpoint, PUBSUB_WEBSOCKET_ADDRESS_KEY, NULL);
+    long sockPort = celix_properties_getAsLong(endpoint, PUBSUB_WEBSOCKET_PORT_KEY, -1L);
+
+    bool publisher = type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0;
+
+    if (publisher && (sockAddress == NULL || sockPort < 0)) {
+        L_WARN("[PSA WEBSOCKET] Error got endpoint without websocket address/port or endpoint type. Properties:");
+        const char *key = NULL;
+        CELIX_PROPERTIES_FOR_EACH(endpoint, key) {
+            L_WARN("[PSA WEBSOCKET] |- %s=%s\n", key, celix_properties_get(endpoint, key, NULL));
+        }
+        status = CELIX_BUNDLE_EXCEPTION;
+    } else {
+        if (eScope != NULL && eTopic != NULL &&
+            strncmp(eScope, scope, 1024 * 1024) == 0 &&
+            strncmp(eTopic, topic, 1024 * 1024) == 0) {
+            char *uri = psa_websocket_createURI(eScope, eTopic);
+            pubsub_websocketTopicReceiver_connectTo(receiver, sockAddress, sockPort, uri);
+        }
+    }
+
+    return status;
+}
+
+celix_status_t pubsub_websocketAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
+    pubsub_websocket_admin_t *psa = handle;
+
+    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+
+    if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+        celixThreadMutex_lock(&psa->topicReceivers.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_websocket_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+            pubsub_websocketAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+        }
+        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    }
+
+    celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+    celix_properties_t *cpy = celix_properties_copy(endpoint);
+    const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, NULL);
+    hashMap_put(psa->discoveredEndpoints.map, (void*)uuid, cpy);
+    celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+
+    celix_status_t  status = CELIX_SUCCESS;
+    return status;
+}
+
+
+static celix_status_t pubsub_websocketAdmin_disconnectEndpointFromReceiver(pubsub_websocket_admin_t* psa, pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
+    //note can be called with discoveredEndpoint.mutex lock
+    celix_status_t status = CELIX_SUCCESS;
+
+    const char *scope = pubsub_websocketTopicReceiver_scope(receiver);
+    const char *topic = pubsub_websocketTopicReceiver_topic(receiver);
+
+    const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+    const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+
+    if (eScope != NULL && eTopic != NULL &&
+            strncmp(eScope, scope, 1024 * 1024) == 0 && strncmp(eTopic, topic, 1024 * 1024) == 0) {
+        char *uri = psa_websocket_createURI(eScope, eTopic);
+        pubsub_websocketTopicReceiver_disconnectFrom(receiver, uri);
+
+    }
+
+    return status;
+}
+
+celix_status_t pubsub_websocketAdmin_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
+    pubsub_websocket_admin_t *psa = handle;
+
+    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+
+    if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+        celixThreadMutex_lock(&psa->topicReceivers.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_websocket_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+            pubsub_websocketAdmin_disconnectEndpointFromReceiver(psa, receiver, endpoint);
+        }
+        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    }
+
+    celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+    const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
+    celix_properties_t *found = hashMap_remove(psa->discoveredEndpoints.map, (void*)uuid);
+    celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+
+    if (found != NULL) {
+        celix_properties_destroy(found);
+    }
+
+    celix_status_t  status = CELIX_SUCCESS;
+    return status;
+}
+
+celix_status_t pubsub_websocketAdmin_executeCommand(void *handle, char *commandLine __attribute__((unused)), FILE *out, FILE *errStream __attribute__((unused))) {
+    pubsub_websocket_admin_t *psa = handle;
+    celix_status_t  status = CELIX_SUCCESS;
+
+    fprintf(out, "\n");
+    fprintf(out, "Topic Senders:\n");
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    celixThreadMutex_lock(&psa->topicSenders.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_websocket_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
+        long serSvcId = pubsub_websocketTopicSender_serializerSvcId(sender);
+        psa_websocket_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serSvcId);
+        const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
+        const char *scope = pubsub_websocketTopicSender_scope(sender);
+        const char *topic = pubsub_websocketTopicSender_topic(sender);
+        const char *url = pubsub_websocketTopicSender_url(sender);
+//        const char *postUrl = pubsub_websocketTopicSender_isStatic(sender) ? " (static)" : "";
+        fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
+        fprintf(out, "   |- serializer type = %s\n", serType);
+        fprintf(out, "   |- url            = %s\n", url);
+    }
+    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+
+    fprintf(out, "\n");
+    fprintf(out, "\nTopic Receivers:\n");
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    iter = hashMapIterator_construct(psa->topicReceivers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_websocket_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+        long serSvcId = pubsub_websocketTopicReceiver_serializerSvcId(receiver);
+        psa_websocket_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serSvcId);
+        const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
+        const char *scope = pubsub_websocketTopicReceiver_scope(receiver);
+        const char *topic = pubsub_websocketTopicReceiver_topic(receiver);
+
+        celix_array_list_t *connected = celix_arrayList_create();
+        celix_array_list_t *unconnected = celix_arrayList_create();
+        pubsub_websocketTopicReceiver_listConnections(receiver, connected, unconnected);
+
+        fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
+        fprintf(out, "   |- serializer type = %s\n", serType);
+        for (int i = 0; i < celix_arrayList_size(connected); ++i) {
+            char *url = celix_arrayList_get(connected, i);
+            fprintf(out, "   |- connected url   = %s\n", url);
+            free(url);
+        }
+        for (int i = 0; i < celix_arrayList_size(unconnected); ++i) {
+            char *url = celix_arrayList_get(unconnected, i);
+            fprintf(out, "   |- unconnected url = %s\n", url);
+            free(url);
+        }
+        celix_arrayList_destroy(connected);
+        celix_arrayList_destroy(unconnected);
+    }
+    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+    fprintf(out, "\n");
+
+    return status;
+}
+
+pubsub_admin_metrics_t* pubsub_websocketAdmin_metrics(void *handle) {
+    pubsub_websocket_admin_t *psa = handle;
+    pubsub_admin_metrics_t *result = calloc(1, sizeof(*result));
+    snprintf(result->psaType, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", PUBSUB_WEBSOCKET_ADMIN_TYPE);
+    result->senders = celix_arrayList_create();
+    result->receivers = celix_arrayList_create();
+
+    celixThreadMutex_lock(&psa->topicSenders.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_websocket_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
+        pubsub_admin_sender_metrics_t *metrics = pubsub_websocketTopicSender_metrics(sender);
+        celix_arrayList_add(result->senders, metrics);
+    }
+    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    iter = hashMapIterator_construct(psa->topicReceivers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_websocket_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+        pubsub_admin_receiver_metrics_t *metrics = pubsub_websocketTopicReceiver_metrics(receiver);
+        celix_arrayList_add(result->receivers, metrics);
+    }
+    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+    return result;
+}
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h
new file mode 100644
index 0000000..62a14a9
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef CELIX_PUBSUB_WEBSOCKET_ADMIN_H
+#define CELIX_PUBSUB_WEBSOCKET_ADMIN_H
+
+#include <pubsub_admin_metrics.h>
+#include "celix_api.h"
+#include "log_helper.h"
+#include "pubsub_psa_websocket_constants.h"
+
+typedef struct pubsub_websocket_admin pubsub_websocket_admin_t;
+
+pubsub_websocket_admin_t* pubsub_websocketAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
+void pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t *psa);
+
+celix_status_t pubsub_websocketAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_websocketAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_websocketAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
+
+celix_status_t pubsub_websocketAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsub_websocketAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
+
+celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_websocketAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
+
+celix_status_t pubsub_websocketAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
+celix_status_t pubsub_websocketAdmin_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
+
+void pubsub_websocketAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
+void pubsub_websocketAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
+
+celix_status_t pubsub_websocketAdmin_executeCommand(void *handle, char *commandLine, FILE *outStream, FILE *errStream);
+
+pubsub_admin_metrics_t* pubsub_websocketAdmin_metrics(void *handle);
+
+#endif //CELIX_PUBSUB_WEBSOCKET_ADMIN_H
+
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c
new file mode 100644
index 0000000..ce85e40
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <memory.h>
+#include <assert.h>
+#include <stdio.h>
+#include "pubsub_websocket_common.h"
+
+int psa_websocket_localMsgTypeIdForMsgType(void* handle __attribute__((unused)), const char* msgType, unsigned int* msgTypeId) {
+    *msgTypeId = utils_stringHash(msgType);
+    return 0;
+}
+
+bool psa_websocket_checkVersion(version_pt msgVersion, const pubsub_websocket_msg_header_t *hdr) {
+    bool check=false;
+    int major=0,minor=0;
+
+    if (hdr->major == 0 && hdr->minor == 0) {
+        //no check
+        return true;
+    }
+
+    if (msgVersion!=NULL) {
+        version_getMajor(msgVersion,&major);
+        version_getMinor(msgVersion,&minor);
+        if (hdr->major==((unsigned char)major)) { /* Different major means incompatible */
+            check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
+        }
+    }
+
+    return check;
+}
+
+void psa_websocket_setScopeAndTopicFilter(const char* scope, const char *topic, char *filter) {
+    for (int i = 0; i < 5; ++i) {
+        filter[i] = '\0';
+    }
+    if (scope != NULL && strnlen(scope, 3) >= 2)  {
+        filter[0] = scope[0];
+        filter[1] = scope[1];
+    }
+    if (topic != NULL && strnlen(topic, 3) >= 2)  {
+        filter[2] = topic[0];
+        filter[3] = topic[1];
+    }
+}
+
+char *psa_websocket_createURI(const char *scope, const char *topic) {
+    char *uri = NULL;
+    if(scope != NULL && topic != NULL) {
+        asprintf(&uri, "/pubsub/%s/%s", scope, topic);
+    }
+    else if(scope == NULL && topic != NULL) {
+        asprintf(&uri, "/pubsub/default/%s", topic);
+    }
+    return uri;
+}
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
new file mode 100644
index 0000000..89ec65a
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef CELIX_PUBSUB_WEBSOCKET_COMMON_H
+#define CELIX_PUBSUB_WEBSOCKET_COMMON_H
+
+#include <utils.h>
+#include <stdint.h>
+
+#include "version.h"
+
+
+struct pubsub_websocket_msg_header {
+    uint32_t type; //msg type id (hash of fqn)
+    uint8_t major;
+    uint8_t minor;
+    uint32_t seqNr;
+    unsigned char originUUID[16];
+    uint64_t sendtimeSeconds; //seconds since epoch
+    uint64_t sendTimeNanoseconds; //ns since epoch
+};
+
+typedef struct pubsub_websocket_msg_header pubsub_websocket_msg_header_t;
+
+struct pubsub_websocket_msg {
+    pubsub_websocket_msg_header_t header;
+    unsigned int payloadSize;
+    char payload[];
+};
+
+typedef struct pubsub_websocket_msg pubsub_websocket_msg_t;
+
+int psa_websocket_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId);
+void psa_websocket_setScopeAndTopicFilter(const char* scope, const char *topic, char *filter);
+char *psa_websocket_createURI(const char *scope, const char *topic);
+
+bool psa_websocket_checkVersion(version_pt msgVersion, const pubsub_websocket_msg_header_t *hdr);
+
+#endif //CELIX_PUBSUB_WEBSOCKET_COMMON_H
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
new file mode 100644
index 0000000..4a41a0e
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
@@ -0,0 +1,787 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pubsub_serializer.h>
+#include <stdlib.h>
+#include <pubsub/subscriber.h>
+#include <memory.h>
+#include <pubsub_constants.h>
+#include <sys/epoll.h>
+#include <assert.h>
+#include <pubsub_endpoint.h>
+#include <arpa/inet.h>
+#include <log_helper.h>
+#include <math.h>
+#include "pubsub_websocket_topic_receiver.h"
+#include "pubsub_psa_websocket_constants.h"
+#include "pubsub_websocket_common.h"
+
+#include <uuid/uuid.h>
+#include <pubsub_admin_metrics.h>
+#include <http_admin/api.h>
+
+#ifndef UUID_STR_LEN
+#define UUID_STR_LEN 37
+#endif
+
+
+#define L_DEBUG(...) \
+    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
+typedef struct pubsub_websocket_rcv_buffer {
+    celix_thread_mutex_t mutex;
+    celix_array_list_t *list;     //List of received websocket messages (type: pubsub_websocket_msg_t *)
+    celix_array_list_t *rcvTimes; //Corresponding receive times of the received websocket messages (rcvTimes[i] -> list[i])
+} pubsub_websocket_rcv_buffer_t;
+
+struct pubsub_websocket_topic_receiver {
+    celix_bundle_context_t *ctx;
+    log_helper_t *logHelper;
+    long serializerSvcId;
+    pubsub_serializer_service_t *serializer;
+    char *scope;
+    char *topic;
+    char scopeAndTopicFilter[5];
+    char *uri;
+    bool metricsEnabled;
+
+    pubsub_websocket_rcv_buffer_t recvBuffer;
+
+    struct {
+        celix_thread_t thread;
+        celix_thread_mutex_t mutex;
+        bool running;
+    } recvThread;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = url (host:port), value = psa_websocket_requested_connection_entry_t*
+        bool allConnected; //true if all requestedConnectection are connected
+    } requestedConnections;
+
+    long subscriberTrackerId;
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = bnd id, value = psa_websocket_subscriber_entry_t
+        bool allInitialized;
+    } subscribers;
+};
+
+typedef struct psa_websocket_requested_connection_entry {
+    pubsub_websocket_rcv_buffer_t *recvBuffer;
+    char *key; //host:port
+    char *socketAddress;
+    long socketPort;
+    char *uri;
+    struct mg_connection *sockConnection;
+    int connectRetryCount;
+    bool connected;
+    bool statically; //true if the connection is statically configured through the topic properties.
+} psa_websocket_requested_connection_entry_t;
+
+typedef struct psa_websocket_subscriber_metrics_entry_t {
+    unsigned int msgTypeId;
+    uuid_t origin;
+
+    unsigned long nrOfMessagesReceived;
+    unsigned long nrOfSerializationErrors;
+    struct timespec lastMessageReceived;
+    double averageTimeBetweenMessagesInSeconds;
+    double averageSerializationTimeInSeconds;
+    double averageDelayInSeconds;
+    double maxDelayInSeconds;
+    double minDelayInSeconds;
+    unsigned int lastSeqNr;
+    unsigned long nrOfMissingSeqNumbers;
+} psa_websocket_subscriber_metrics_entry_t;
+
+typedef struct psa_websocket_subscriber_entry {
+    int usageCount;
+    hash_map_t *msgTypes; //map from serializer svc
+    hash_map_t *metrics; //key = msg type id, value = hash_map (key = origin uuid, value = psa_websocket_subscriber_metrics_entry_t*
+    pubsub_subscriber_t *svc;
+    bool initialized; //true if the init function is called through the receive thread
+} psa_websocket_subscriber_entry_t;
+
+
+static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
+static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
+static void* psa_websocket_recvThread(void * data);
+static void psa_websocket_connectToAllRequestedConnections(pubsub_websocket_topic_receiver_t *receiver);
+static void psa_websocket_initializeAllSubscribers(pubsub_websocket_topic_receiver_t *receiver);
+
+static int psa_websocketTopicReceiver_data(struct mg_connection *connection, int op_code, char *data, size_t length, void *handle);
+static void psa_websocketTopicReceiver_close(const struct mg_connection *connection, void *handle);
+
+
+pubsub_websocket_topic_receiver_t* pubsub_websocketTopicReceiver_create(celix_bundle_context_t *ctx,
+                                                              log_helper_t *logHelper,
+                                                              const char *scope,
+                                                              const char *topic,
+                                                              const celix_properties_t *topicProperties,
+                                                              long serializerSvcId,
+                                                              pubsub_serializer_service_t *serializer) {
+    pubsub_websocket_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
+    receiver->ctx = ctx;
+    receiver->logHelper = logHelper;
+    receiver->serializerSvcId = serializerSvcId;
+    receiver->serializer = serializer;
+    receiver->scope = strndup(scope, 1024 * 1024);
+    receiver->topic = strndup(topic, 1024 * 1024);
+    psa_websocket_setScopeAndTopicFilter(scope, topic, receiver->scopeAndTopicFilter);
+    receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_WEBSOCKET_METRICS_ENABLED, PSA_WEBSOCKET_DEFAULT_METRICS_ENABLED);
+
+    receiver->uri = psa_websocket_createURI(scope, topic);
+
+    if (receiver->uri != NULL) {
+        celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
+        celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
+        celixThreadMutex_create(&receiver->recvThread.mutex, NULL);
+        celixThreadMutex_create(&receiver->recvBuffer.mutex, NULL);
+
+        receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+        receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+        arrayList_create(&receiver->recvBuffer.list);
+        arrayList_create(&receiver->recvBuffer.rcvTimes);
+    }
+
+    //track subscribers
+    if (receiver->uri != NULL) {
+        int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
+        char buf[size+1];
+        snprintf(buf, (size_t)size+1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
+        celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+        opts.filter.ignoreServiceLanguage = true;
+        opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
+        opts.filter.filter = buf;
+        opts.callbackHandle = receiver;
+        opts.addWithOwner = pubsub_websocketTopicReceiver_addSubscriber;
+        opts.removeWithOwner = pubsub_websocketTopicReceiver_removeSubscriber;
+
+        receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+    }
+
+
+    const char *staticConnects = celix_properties_get(topicProperties, PUBSUB_WEBSOCKET_STATIC_CONNECT_SOCKET_ADDRESSES, NULL);
+    if (staticConnects != NULL) {
+        char *copy = strndup(staticConnects, 1024*1024);
+        char* addr;
+        char* save = copy;
+
+        while ((addr = strtok_r(save, " ", &save))) {
+            char *colon = strchr(addr, ':');
+            if (colon == NULL) {
+                continue;
+            }
+
+            char *sockAddr = NULL;
+            asprintf(&sockAddr, "%.*s", (int)(colon - addr), addr);
+
+            long sockPort = atol((colon + 1));
+
+            char *key = NULL;
+            asprintf(&key, "%s:%li", sockAddr, sockPort);
+
+
+            if (sockPort > 0) {
+                psa_websocket_requested_connection_entry_t *entry = calloc(1, sizeof(*entry));
+                entry->key = key;
+                entry->uri = strndup(receiver->uri, 1024 * 1024);
+                entry->socketAddress = sockAddr;
+                entry->socketPort = sockPort;
+                entry->connected = false;
+                entry->statically = true;
+                entry->recvBuffer = &receiver->recvBuffer;
+                hashMap_put(receiver->requestedConnections.map, (void *) entry->key, entry);
+            } else {
+                L_WARN("[PSA_WEBSOCKET_TR] Invalid static socket address %s", addr);
+                free(key);
+                free(sockAddr);
+            }
+        }
+        free(copy);
+    }
+
+
+    if (receiver->uri != NULL) {
+        receiver->recvThread.running = true;
+        celixThread_create(&receiver->recvThread.thread, NULL, psa_websocket_recvThread, receiver);
+        char name[64];
+        snprintf(name, 64, "WEBSOCKET TR %s/%s", scope, topic);
+        celixThread_setName(&receiver->recvThread.thread, name);
+    }
+
+    if (receiver->uri == NULL) {
+        free(receiver->scope);
+        free(receiver->topic);
+        free(receiver);
+        receiver = NULL;
+        L_ERROR("[PSA_WEBSOCKET] Cannot create TopicReceiver for %s/%s", scope, topic);
+    }
+
+    return receiver;
+}
+
+void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *receiver) {
+    if (receiver != NULL) {
+
+        celixThreadMutex_lock(&receiver->recvThread.mutex);
+        receiver->recvThread.running = false;
+        celixThreadMutex_unlock(&receiver->recvThread.mutex);
+        celixThread_join(receiver->recvThread.thread, NULL);
+
+        celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId);
+
+        celixThreadMutex_lock(&receiver->subscribers.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+            if (entry != NULL)  {
+                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
+                while (hashMapIterator_hasNext(&iter2)) {
+                    hash_map_t *origins = hashMapIterator_nextValue(&iter2);
+                    hashMap_destroy(origins, true, true);
+                }
+                hashMap_destroy(entry->metrics, false, false);
+
+                receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+                free(entry);
+            }
+
+        }
+        hashMap_destroy(receiver->subscribers.map, false, false);
+
+
+        celixThreadMutex_unlock(&receiver->subscribers.mutex);
+
+        celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+        iter = hashMapIterator_construct(receiver->requestedConnections.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_websocket_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+            if (entry != NULL) {
+                if(entry->connected) {
+                    mg_close_connection(entry->sockConnection);
+                }
+                free(entry->uri);
+                free(entry->socketAddress);
+                free(entry->key);
+                free(entry);
+            }
+        }
+        hashMap_destroy(receiver->requestedConnections.map, false, false);
+        celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+        celixThreadMutex_destroy(&receiver->subscribers.mutex);
+        celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
+        celixThreadMutex_destroy(&receiver->recvThread.mutex);
+
+        celixThreadMutex_destroy(&receiver->recvBuffer.mutex);
+        int msgBufSize = celix_arrayList_size(receiver->recvBuffer.list);
+        while(msgBufSize > 0) {
+            pubsub_websocket_msg_t *msg = celix_arrayList_get(receiver->recvBuffer.list, msgBufSize - 1);
+            free(msg);
+            msgBufSize--;
+        }
+        celix_arrayList_destroy(receiver->recvBuffer.list);
+
+        int rcvTimesSize = celix_arrayList_size(receiver->recvBuffer.rcvTimes);
+        while(rcvTimesSize > 0) {
+            struct timespec *time = celix_arrayList_get(receiver->recvBuffer.rcvTimes, rcvTimesSize - 1);
+            free(time);
+            rcvTimesSize--;
+        }
+        celix_arrayList_destroy(receiver->recvBuffer.rcvTimes);
+
+        free(receiver->uri);
+        free(receiver->scope);
+        free(receiver->topic);
+    }
+    free(receiver);
+}
+
+const char* pubsub_websocketTopicReceiver_scope(pubsub_websocket_topic_receiver_t *receiver) {
+    return receiver->scope;
+}
+const char* pubsub_websocketTopicReceiver_topic(pubsub_websocket_topic_receiver_t *receiver) {
+    return receiver->topic;
+}
+
+long pubsub_websocketTopicReceiver_serializerSvcId(pubsub_websocket_topic_receiver_t *receiver) {
+    return receiver->serializerSvcId;
+}
+
+void pubsub_websocketTopicReceiver_listConnections(pubsub_websocket_topic_receiver_t *receiver, celix_array_list_t *connectedUrls, celix_array_list_t *unconnectedUrls) {
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_websocket_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+        char *url = NULL;
+        asprintf(&url, "%s%s", entry->uri, entry->statically ? " (static)" : "");
+        if (entry->connected) {
+            celix_arrayList_add(connectedUrls, url);
+        } else {
+            celix_arrayList_add(unconnectedUrls, url);
+        }
+    }
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+
+void pubsub_websocketTopicReceiver_connectTo(pubsub_websocket_topic_receiver_t *receiver, const char *socketAddress, long socketPort, const char *uri) {
+    L_DEBUG("[PSA_WEBSOCKET] TopicReceiver %s/%s connecting to websocket uri %s", receiver->scope, receiver->topic, uri);
+
+    char *key = NULL;
+    asprintf(&key, "%s:%li", socketAddress, socketPort);
+
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    psa_websocket_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, key);
+    if (entry == NULL) {
+        entry = calloc(1, sizeof(*entry));
+        entry->key = key;
+        entry->uri = strndup(uri, 1024 * 1024);
+        entry->socketAddress = strndup(socketAddress, 1024 * 1024);
+        entry->socketPort = socketPort;
+        entry->connected = false;
+        entry->statically = false;
+        entry->recvBuffer = &receiver->recvBuffer;
+        hashMap_put(receiver->requestedConnections.map, (void*)entry->uri, entry);
+        receiver->requestedConnections.allConnected = false;
+    }
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+    psa_websocket_connectToAllRequestedConnections(receiver);
+}
+
+void pubsub_websocketTopicReceiver_disconnectFrom(pubsub_websocket_topic_receiver_t *receiver, const char *uri) {
+    L_DEBUG("[PSA_WEBSOCKET] TopicReceiver %s/%s disconnect from websocket uri %s", receiver->scope, receiver->topic, uri);
+
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    psa_websocket_requested_connection_entry_t *entry = hashMap_remove(receiver->requestedConnections.map, uri);
+    if (entry != NULL && entry->connected) {
+        mg_close_connection(entry->sockConnection);
+        L_WARN("[PSA_WEBSOCKET] Error disconnecting from websocket uri %s.", uri);
+    }
+    if (entry != NULL) {
+        free(entry->socketAddress);
+        free(entry->uri);
+        free(entry->key);
+        free(entry);
+    }
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
+    pubsub_websocket_topic_receiver_t *receiver = handle;
+
+    long bndId = celix_bundle_getId(bnd);
+    const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
+    if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) {
+        //not the same scope. ignore
+        return;
+    }
+
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    psa_websocket_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
+    if (entry != NULL) {
+        entry->usageCount += 1;
+    } else {
+        //new create entry
+        entry = calloc(1, sizeof(*entry));
+        entry->usageCount = 1;
+        entry->svc = svc;
+        entry->initialized = false;
+
+        int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
+
+        if (rc == 0) {
+            entry->metrics = hashMap_create(NULL, NULL, NULL, NULL);
+            hash_map_iterator_t iter = hashMapIterator_construct(entry->msgTypes);
+            while (hashMapIterator_hasNext(&iter)) {
+                pubsub_msg_serializer_t *msgSer = hashMapIterator_nextValue(&iter);
+                hash_map_t *origins = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+                hashMap_put(entry->metrics, (void*)(uintptr_t)msgSer->msgId, origins);
+            }
+        }
+
+        if (rc == 0) {
+            hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
+        } else {
+            L_ERROR("[PSA_WEBSOCKET] Cannot create msg serializer map for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+            free(entry);
+        }
+    }
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
+
+static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props __attribute__((unused)), const celix_bundle_t *bnd) {
+    pubsub_websocket_topic_receiver_t *receiver = handle;
+
+    long bndId = celix_bundle_getId(bnd);
+
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    psa_websocket_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
+    if (entry != NULL) {
+        entry->usageCount -= 1;
+    }
+    if (entry != NULL && entry->usageCount <= 0) {
+        //remove entry
+        hashMap_remove(receiver->subscribers.map, (void*)bndId);
+        int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+        if (rc != 0) {
+            L_ERROR("[PSA_WEBSOCKET] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+        }
+        hash_map_iterator_t iter = hashMapIterator_construct(entry->metrics);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_t *origins = hashMapIterator_nextValue(&iter);
+            hashMap_destroy(origins, true, true);
+        }
+        hashMap_destroy(entry->metrics, false, false);
+        free(entry);
+    }
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
+
+static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_t *receiver, psa_websocket_subscriber_entry_t* entry, const pubsub_websocket_msg_header_t *hdr, const void* payload, size_t payloadSize, struct timespec *receiveTime) {
+    //NOTE receiver->subscribers.mutex locked
+    pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type));
+    pubsub_subscriber_t *svc = entry->svc;
+    bool monitor = receiver->metricsEnabled;
+
+    //monitoring
+    struct timespec beginSer;
+    struct timespec endSer;
+    int updateReceiveCount = 0;
+    int updateSerError = 0;
+
+    if (msgSer!= NULL) {
+        void *deserializedMsg = NULL;
+        bool validVersion = psa_websocket_checkVersion(msgSer->msgVersion, hdr);
+        if (validVersion) {
+            if (monitor) {
+                clock_gettime(CLOCK_REALTIME, &beginSer);
+            }
+            celix_status_t status = msgSer->deserialize(msgSer->handle, payload, payloadSize, &deserializedMsg);
+            if (monitor) {
+                clock_gettime(CLOCK_REALTIME, &endSer);
+            }
+            if (status == CELIX_SUCCESS) {
+                bool release = true;
+                svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, &release);
+                if (release) {
+                    msgSer->freeMsg(msgSer->handle, deserializedMsg);
+                }
+                updateReceiveCount += 1;
+            } else {
+                updateSerError += 1;
+                L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope, receiver->topic);
+            }
+        }
+    } else {
+        L_WARN("[PSA_WEBSOCKET_TR] Cannot find serializer for type id 0x%X", hdr->type);
+    }
+
+    if (msgSer != NULL && monitor) {
+        hash_map_t *origins = hashMap_get(entry->metrics, (void*)(uintptr_t )hdr->type);
+        char uuidStr[UUID_STR_LEN+1];
+        uuid_unparse(hdr->originUUID, uuidStr);
+        psa_websocket_subscriber_metrics_entry_t *metrics = hashMap_get(origins, uuidStr);
+
+        if (metrics == NULL) {
+            metrics = calloc(1, sizeof(*metrics));
+            hashMap_put(origins, strndup(uuidStr, UUID_STR_LEN+1), metrics);
+            uuid_copy(metrics->origin, hdr->originUUID);
+            metrics->msgTypeId = hdr->type;
+            metrics->maxDelayInSeconds = -INFINITY;
+            metrics->minDelayInSeconds = INFINITY;
+            metrics->lastSeqNr = 0;
+        }
+
+        double diff = celix_difftime(&beginSer, &endSer);
+        long n = metrics->nrOfMessagesReceived;
+        metrics->averageSerializationTimeInSeconds = (metrics->averageSerializationTimeInSeconds * n + diff) / (n+1);
+
+        diff = celix_difftime(&metrics->lastMessageReceived, receiveTime);
+        n = metrics->nrOfMessagesReceived;
+        if (metrics->nrOfMessagesReceived >= 1) {
+            metrics->averageTimeBetweenMessagesInSeconds = (metrics->averageTimeBetweenMessagesInSeconds * n + diff) / (n + 1);
+        }
+        metrics->lastMessageReceived = *receiveTime;
+
+
+        int incr = hdr->seqNr - metrics->lastSeqNr;
+        if (metrics->lastSeqNr >0 && incr > 1) {
+            metrics->nrOfMissingSeqNumbers += (incr - 1);
+            L_WARN("Missing message seq nr went from %i to %i", metrics->lastSeqNr, hdr->seqNr);
+        }
+        metrics->lastSeqNr = hdr->seqNr;
+
+        struct timespec sendTime;
+        sendTime.tv_sec = (time_t)hdr->sendtimeSeconds;
+        sendTime.tv_nsec = (long)hdr->sendTimeNanoseconds; //TODO FIXME the tv_nsec is not correct
+        diff = celix_difftime(&sendTime, receiveTime);
+        metrics->averageDelayInSeconds = (metrics->averageDelayInSeconds * n + diff) / (n+1);
+        if (diff < metrics->minDelayInSeconds) {
+            metrics->minDelayInSeconds = diff;
+        }
+        if (diff > metrics->maxDelayInSeconds) {
+            metrics->maxDelayInSeconds = diff;
+        }
+
+        metrics->nrOfMessagesReceived += updateReceiveCount;
+        metrics->nrOfSerializationErrors += updateSerError;
+    }
+}
+
+static inline void processMsg(pubsub_websocket_topic_receiver_t *receiver, const pubsub_websocket_msg_header_t *hdr, const char *payload, size_t payloadSize, struct timespec *receiveTime) {
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+        if (entry != NULL) {
+            processMsgForSubscriberEntry(receiver, entry, hdr, payload, payloadSize, receiveTime);
+        }
+    }
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
+
+static void* psa_websocket_recvThread(void * data) {
+    pubsub_websocket_topic_receiver_t *receiver = data;
+
+    celixThreadMutex_lock(&receiver->recvThread.mutex);
+    bool running = receiver->recvThread.running;
+    celixThreadMutex_unlock(&receiver->recvThread.mutex);
+
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    bool allConnected = receiver->requestedConnections.allConnected;
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    bool allInitialized = receiver->subscribers.allInitialized;
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+
+
+    while (running) {
+        if (!allConnected) {
+            psa_websocket_connectToAllRequestedConnections(receiver);
+        }
+        if (!allInitialized) {
+            psa_websocket_initializeAllSubscribers(receiver);
+        }
+
+        while(celix_arrayList_size(receiver->recvBuffer.list) > 0) {
+            celixThreadMutex_lock(&receiver->recvBuffer.mutex);
+            pubsub_websocket_msg_t *msg = (pubsub_websocket_msg_t *) celix_arrayList_get(receiver->recvBuffer.list, 0);
+            struct timespec *rcvTime = (struct timespec *) celix_arrayList_get(receiver->recvBuffer.rcvTimes, 0);
+            celixThreadMutex_unlock(&receiver->recvBuffer.mutex);
+
+            processMsg(receiver, &msg->header, msg->payload, msg->payloadSize, rcvTime);
+            free(msg);
+            free(rcvTime);
+
+            celixThreadMutex_lock(&receiver->recvBuffer.mutex);
+            celix_arrayList_removeAt(receiver->recvBuffer.list, 0);
+            celix_arrayList_removeAt(receiver->recvBuffer.rcvTimes, 0);
+            celixThreadMutex_unlock(&receiver->recvBuffer.mutex);
+        }
+
+        celixThreadMutex_lock(&receiver->recvThread.mutex);
+        running = receiver->recvThread.running;
+        celixThreadMutex_unlock(&receiver->recvThread.mutex);
+
+        celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+        allConnected = receiver->requestedConnections.allConnected;
+        celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+        celixThreadMutex_lock(&receiver->subscribers.mutex);
+        allInitialized = receiver->subscribers.allInitialized;
+        celixThreadMutex_unlock(&receiver->subscribers.mutex);
+    } // while
+
+    return NULL;
+}
+
+static int psa_websocketTopicReceiver_data(struct mg_connection *connection __attribute__((unused)),
+                                            int op_code __attribute__((unused)),
+                                            char *data,
+                                            size_t length,
+                                            void *handle) {
+    //Received a websocket message, append this message to the buffer of the receiver.
+    if (handle != NULL) {
+        psa_websocket_requested_connection_entry_t *entry = (psa_websocket_requested_connection_entry_t *) handle;
+
+        pubsub_websocket_msg_t *rcvdMsg = malloc(length);
+        memcpy(rcvdMsg, data, length);
+
+        //Check if payload is completely received
+        unsigned long rcvdPayloadSize = length - sizeof(rcvdMsg->header) - sizeof(rcvdMsg->payloadSize);
+        if(rcvdMsg->payloadSize == rcvdPayloadSize) {
+            celixThreadMutex_lock(&entry->recvBuffer->mutex);
+            celix_arrayList_add(entry->recvBuffer->list, rcvdMsg);
+            struct timespec *receiveTime = malloc(sizeof(*receiveTime));
+            clock_gettime(CLOCK_REALTIME, receiveTime);
+            celix_arrayList_add(entry->recvBuffer->rcvTimes, receiveTime);
+            celixThreadMutex_unlock(&entry->recvBuffer->mutex);
+        } else {
+            free(rcvdMsg);
+        }
+    }
+
+    return 1; //keep open (non-zero), 0 to close the socket
+}
+
+static void psa_websocketTopicReceiver_close(const struct mg_connection *connection __attribute__((unused)), void *handle) {
+    //Reset connection for this receiver entry
+    if (handle != NULL) {
+        psa_websocket_requested_connection_entry_t *entry = (psa_websocket_requested_connection_entry_t *) handle;
+        entry->connected = false;
+        entry->sockConnection = NULL;
+    }
+}
+
+pubsub_admin_receiver_metrics_t* pubsub_websocketTopicReceiver_metrics(pubsub_websocket_topic_receiver_t *receiver) {
+    pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
+    snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope);
+    snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic);
+
+    size_t msgTypesCount = 0;
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+        hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
+        while (hashMapIterator_hasNext(&iter2)) {
+            hashMapIterator_nextValue(&iter2);
+            msgTypesCount += 1;
+        }
+    }
+
+    result->nrOfMsgTypes = (unsigned long)msgTypesCount;
+    result->msgTypes = calloc(msgTypesCount, sizeof(*result->msgTypes));
+    int i = 0;
+    iter = hashMapIterator_construct(receiver->subscribers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+        hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
+        while (hashMapIterator_hasNext(&iter2)) {
+            hash_map_t *origins = hashMapIterator_nextValue(&iter2);
+            result->msgTypes[i].origins = calloc((size_t)hashMap_size(origins), sizeof(*(result->msgTypes[i].origins)));
+            result->msgTypes[i].nrOfOrigins = hashMap_size(origins);
+            int k = 0;
+            hash_map_iterator_t iter3 = hashMapIterator_construct(origins);
+            while (hashMapIterator_hasNext(&iter3)) {
+                psa_websocket_subscriber_metrics_entry_t *metrics = hashMapIterator_nextValue(&iter3);
+                result->msgTypes[i].typeId = metrics->msgTypeId;
+                pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes, (void*)(uintptr_t)metrics->msgTypeId);
+                if (msgSer) {
+                    snprintf(result->msgTypes[i].typeFqn, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", msgSer->msgName);
+                    uuid_copy(result->msgTypes[i].origins[k].originUUID, metrics->origin);
+                    result->msgTypes[i].origins[k].nrOfMessagesReceived = metrics->nrOfMessagesReceived;
+                    result->msgTypes[i].origins[k].nrOfSerializationErrors = metrics->nrOfSerializationErrors;
+                    result->msgTypes[i].origins[k].averageDelayInSeconds = metrics->averageDelayInSeconds;
+                    result->msgTypes[i].origins[k].maxDelayInSeconds = metrics->maxDelayInSeconds;
+                    result->msgTypes[i].origins[k].minDelayInSeconds = metrics->minDelayInSeconds;
+                    result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds = metrics->averageTimeBetweenMessagesInSeconds;
+                    result->msgTypes[i].origins[k].averageSerializationTimeInSeconds = metrics->averageSerializationTimeInSeconds;
+                    result->msgTypes[i].origins[k].lastMessageReceived = metrics->lastMessageReceived;
+                    result->msgTypes[i].origins[k].nrOfMissingSeqNumbers = metrics->nrOfMissingSeqNumbers;
+
+                    k += 1;
+                } else {
+                    L_WARN("[PSA_WEBSOCKET]: Error cannot find key 0x%X in msg map during metrics collection!\n", metrics->msgTypeId);
+                }
+            }
+            i +=1 ;
+        }
+    }
+
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+
+    return result;
+}
+
+
+static void psa_websocket_connectToAllRequestedConnections(pubsub_websocket_topic_receiver_t *receiver) {
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    if (!receiver->requestedConnections.allConnected) {
+        bool allConnected = true;
+        hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_websocket_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+            if (!entry->connected) {
+                char errBuf[100] = {0};
+                entry->sockConnection = mg_connect_websocket_client(entry->socketAddress,
+                                                                    (int) entry->socketPort,
+                                                                    0, // No ssl
+                                                                    errBuf,
+                                                                    (size_t) sizeof(errBuf),
+                                                                    entry->uri,
+                                                                    NULL,
+                                                                    psa_websocketTopicReceiver_data,
+                                                                    psa_websocketTopicReceiver_close,
+                                                                    entry);
+                if(entry->sockConnection != NULL) {
+                    entry->connected = true;
+                    entry->connectRetryCount = 0;
+                } else {
+                    entry->connectRetryCount += 1;
+                    allConnected = false;
+                    if((entry->connectRetryCount % 10) == 0) {
+                        L_WARN("[PSA_WEBSOCKET] Error connecting to websocket %s:%li/%s. Error: %s",
+                               entry->socketAddress,
+                               entry->socketPort,
+                               entry->uri, errBuf);
+                    }
+                }
+            }
+        }
+        receiver->requestedConnections.allConnected = allConnected;
+    }
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+static void psa_websocket_initializeAllSubscribers(pubsub_websocket_topic_receiver_t *receiver) {
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    if (!receiver->subscribers.allInitialized) {
+        bool allInitialized = true;
+        hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+            if (!entry->initialized) {
+                int rc = 0;
+                if (entry->svc != NULL && entry->svc->init != NULL) {
+                    rc = entry->svc->init(entry->svc->handle);
+                }
+                if (rc == 0) {
+                    entry->initialized = true;
+                } else {
+                    L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+                    allInitialized = false;
+                }
+            }
+        }
+        receiver->subscribers.allInitialized = allInitialized;
+    }
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h
new file mode 100644
index 0000000..1a00cfa
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef CELIX_PUBSUB_WEBSOCKET_TOPIC_RECEIVER_H
+#define CELIX_PUBSUB_WEBSOCKET_TOPIC_RECEIVER_H
+
+#include <pubsub_admin_metrics.h>
+#include "celix_bundle_context.h"
+
+typedef struct pubsub_websocket_topic_receiver pubsub_websocket_topic_receiver_t;
+
+pubsub_websocket_topic_receiver_t* pubsub_websocketTopicReceiver_create(celix_bundle_context_t *ctx,
+        log_helper_t *logHelper,
+        const char *scope,
+        const char *topic,
+        const celix_properties_t *topicProperties,
+        long serializerSvcId,
+        pubsub_serializer_service_t *serializer);
+void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *receiver);
+
+const char* pubsub_websocketTopicReceiver_scope(pubsub_websocket_topic_receiver_t *receiver);
+const char* pubsub_websocketTopicReceiver_topic(pubsub_websocket_topic_receiver_t *receiver);
+
+long pubsub_websocketTopicReceiver_serializerSvcId(pubsub_websocket_topic_receiver_t *receiver);
+void pubsub_websocketTopicReceiver_listConnections(pubsub_websocket_topic_receiver_t *receiver, celix_array_list_t *connectedUrls, celix_array_list_t *unconnectedUrls);
+
+void pubsub_websocketTopicReceiver_connectTo(pubsub_websocket_topic_receiver_t *receiver, const char *socketAddress, long socketPort, const char *uri);
+void pubsub_websocketTopicReceiver_disconnectFrom(pubsub_websocket_topic_receiver_t *receiver, const char *uri);
+
+
+pubsub_admin_receiver_metrics_t* pubsub_websocketTopicReceiver_metrics(pubsub_websocket_topic_receiver_t *receiver);
+
+
+#endif //CELIX_PUBSUB_WEBSOCKET_TOPIC_RECEIVER_H
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
new file mode 100644
index 0000000..42f66c7
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
@@ -0,0 +1,477 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pubsub_serializer.h>
+#include <stdlib.h>
+#include <memory.h>
+#include <pubsub_constants.h>
+#include <pubsub/publisher.h>
+#include <utils.h>
+#include <zconf.h>
+#include <log_helper.h>
+#include "pubsub_websocket_topic_sender.h"
+#include "pubsub_psa_websocket_constants.h"
+#include "pubsub_websocket_common.h"
+#include <uuid/uuid.h>
+#include <constants.h>
+#include "http_admin/api.h"
+#include "civetweb.h"
+
+#define FIRST_SEND_DELAY_IN_SECONDS             2
+
+#define L_DEBUG(...) \
+    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
+struct pubsub_websocket_topic_sender {
+    celix_bundle_context_t *ctx;
+    log_helper_t *logHelper;
+    long serializerSvcId;
+    pubsub_serializer_service_t *serializer;
+    uuid_t fwUUID;
+    bool metricsEnabled;
+
+    char *scope;
+    char *topic;
+    char scopeAndTopicFilter[5];
+    char *uri;
+
+    celix_websocket_service_t websockSvc;
+    long websockSvcId;
+    struct mg_connection *sockConnection;
+
+    struct {
+        long svcId;
+        celix_service_factory_t factory;
+    } publisher;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map;  //key = bndId, value = psa_websocket_bounded_service_entry_t
+    } boundedServices;
+};
+
+typedef struct psa_websocket_send_msg_entry {
+    pubsub_websocket_msg_header_t header; //partially filled header (only seqnr and time needs to be updated per send)
+    pubsub_msg_serializer_t *msgSer;
+    celix_thread_mutex_t sendLock; //protects send & Seqnr
+    unsigned int seqNr;
+    struct {
+        celix_thread_mutex_t mutex; //protects entries in struct
+        unsigned long nrOfMessagesSend;
+        unsigned long nrOfMessagesSendFailed;
+        unsigned long nrOfSerializationErrors;
+        struct timespec lastMessageSend;
+        double averageTimeBetweenMessagesInSeconds;
+        double averageSerializationTimeInSeconds;
+    } metrics;
+} psa_websocket_send_msg_entry_t;
+
+typedef struct psa_websocket_bounded_service_entry {
+    pubsub_websocket_topic_sender_t *parent;
+    pubsub_publisher_t service;
+    long bndId;
+    hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t
+    hash_map_t *msgEntries; //key = msg type id, value = psa_websocket_send_msg_entry_t
+    int getCount;
+} psa_websocket_bounded_service_entry_t;
+
+
+static void* psa_websocket_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
+static void psa_websocket_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
+static void delay_first_send_for_late_joiners(pubsub_websocket_topic_sender_t *sender);
+
+static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg);
+
+static void psa_websocketTopicSender_ready(struct mg_connection *connection, void *handle);
+static void psa_websocketTopicSender_close(const struct mg_connection *connection, void *handle);
+
+pubsub_websocket_topic_sender_t* pubsub_websocketTopicSender_create(
+        celix_bundle_context_t *ctx,
+        log_helper_t *logHelper,
+        const char *scope,
+        const char *topic,
+        long serializerSvcId,
+        pubsub_serializer_service_t *ser) {
+    pubsub_websocket_topic_sender_t *sender = calloc(1, sizeof(*sender));
+    sender->ctx = ctx;
+    sender->logHelper = logHelper;
+    sender->serializerSvcId = serializerSvcId;
+    sender->serializer = ser;
+    psa_websocket_setScopeAndTopicFilter(scope, topic, sender->scopeAndTopicFilter);
+    const char* uuid = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
+    if (uuid != NULL) {
+        uuid_parse(uuid, sender->fwUUID);
+    }
+    sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_WEBSOCKET_METRICS_ENABLED, PSA_WEBSOCKET_DEFAULT_METRICS_ENABLED);
+
+    sender->uri = psa_websocket_createURI(scope, topic);
+
+    if (sender->uri != NULL) {
+        celix_properties_t *props = celix_properties_create();
+        celix_properties_set(props, WEBSOCKET_ADMIN_URI, sender->uri);
+
+        sender->websockSvc.handle = sender;
+        sender->websockSvc.ready = psa_websocketTopicSender_ready;
+        sender->websockSvc.close = psa_websocketTopicSender_close;
+        sender->websockSvcId = celix_bundleContext_registerService(ctx, &sender->websockSvc,
+                                                                   WEBSOCKET_ADMIN_SERVICE_NAME, props);
+    } else {
+        sender->websockSvcId = -1;
+    }
+
+    if (sender->websockSvcId > 0) {
+        sender->scope = strndup(scope, 1024 * 1024);
+        sender->topic = strndup(topic, 1024 * 1024);
+
+        celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
+        sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
+    }
+
+    //register publisher services using a service factory
+    if (sender->websockSvcId > 0) {
+        sender->publisher.factory.handle = sender;
+        sender->publisher.factory.getService = psa_websocket_getPublisherService;
+        sender->publisher.factory.ungetService = psa_websocket_ungetPublisherService;
+
+        celix_properties_t *props = celix_properties_create();
+        celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic);
+        celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope);
+
+        celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
+        opts.factory = &sender->publisher.factory;
+        opts.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME;
+        opts.serviceVersion = PUBSUB_PUBLISHER_SERVICE_VERSION;
+        opts.properties = props;
+
+        sender->publisher.svcId = celix_bundleContext_registerServiceWithOptions(ctx, &opts);
+    }
+
+    if (sender->websockSvcId < 0) {
+        free(sender);
+        sender = NULL;
+    }
+
+    return sender;
+}
+
+void pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender) {
+    if (sender != NULL) {
+        celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId);
+
+        celixThreadMutex_lock(&sender->boundedServices.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_websocket_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
+            if (entry != NULL) {
+                sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes);
+
+                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
+                while (hashMapIterator_hasNext(&iter2)) {
+                    psa_websocket_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter2);
+                    celixThreadMutex_destroy(&msgEntry->metrics.mutex);
+                    free(msgEntry);
+
+                }
+                hashMap_destroy(entry->msgEntries, false, false);
+
+                free(entry);
+            }
+        }
+        hashMap_destroy(sender->boundedServices.map, false, false);
+        celixThreadMutex_unlock(&sender->boundedServices.mutex);
+
+        celixThreadMutex_destroy(&sender->boundedServices.mutex);
+
+        celix_bundleContext_unregisterService(sender->ctx, sender->websockSvcId);
+
+        free(sender->scope);
+        free(sender->topic);
+        free(sender->uri);
+        free(sender);
+    }
+}
+
+long pubsub_websocketTopicSender_serializerSvcId(pubsub_websocket_topic_sender_t *sender) {
+    return sender->serializerSvcId;
+}
+
+const char* pubsub_websocketTopicSender_scope(pubsub_websocket_topic_sender_t *sender) {
+    return sender->scope;
+}
+
+const char* pubsub_websocketTopicSender_topic(pubsub_websocket_topic_sender_t *sender) {
+    return sender->topic;
+}
+
+const char* pubsub_websocketTopicSender_url(pubsub_websocket_topic_sender_t *sender) {
+    return sender->uri;
+}
+
+
+static void* psa_websocket_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) {
+    pubsub_websocket_topic_sender_t *sender = handle;
+    long bndId = celix_bundle_getId(requestingBundle);
+
+    celixThreadMutex_lock(&sender->boundedServices.mutex);
+    psa_websocket_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void*)bndId);
+    if (entry != NULL) {
+        entry->getCount += 1;
+    } else {
+        entry = calloc(1, sizeof(*entry));
+        entry->getCount = 1;
+        entry->parent = sender;
+        entry->bndId = bndId;
+        entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
+
+        int rc = sender->serializer->createSerializerMap(sender->serializer->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
+        if (rc == 0) {
+            hash_map_iterator_t iter = hashMapIterator_construct(entry->msgTypes);
+            while (hashMapIterator_hasNext(&iter)) {
+                hash_map_entry_t *hashMapEntry = hashMapIterator_nextEntry(&iter);
+                void *key = hashMapEntry_getKey(hashMapEntry);
+                psa_websocket_send_msg_entry_t *sendEntry = calloc(1, sizeof(*sendEntry));
+                sendEntry->msgSer = hashMapEntry_getValue(hashMapEntry);
+                sendEntry->header.type = (int32_t)sendEntry->msgSer->msgId;
+                int major;
+                int minor;
+                version_getMajor(sendEntry->msgSer->msgVersion, &major);
+                version_getMinor(sendEntry->msgSer->msgVersion, &minor);
+                sendEntry->header.major = (uint8_t)major;
+                sendEntry->header.minor = (uint8_t)minor;
+                uuid_copy(sendEntry->header.originUUID, sender->fwUUID);
+                celixThreadMutex_create(&sendEntry->metrics.mutex, NULL);
+                hashMap_put(entry->msgEntries, key, sendEntry);
+            }
+            entry->service.handle = entry;
+            entry->service.localMsgTypeIdForMsgType = psa_websocket_localMsgTypeIdForMsgType;
+            entry->service.send = psa_websocket_topicPublicationSend;
+            hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
+        } else {
+            L_ERROR("Error creating serializer map for websocket TopicSender %s/%s", sender->scope, sender->topic);
+        }
+    }
+    celixThreadMutex_unlock(&sender->boundedServices.mutex);
+
+    return &entry->service;
+}
+
+static void psa_websocket_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) {
+    pubsub_websocket_topic_sender_t *sender = handle;
+    long bndId = celix_bundle_getId(requestingBundle);
+
+    celixThreadMutex_lock(&sender->boundedServices.mutex);
+    psa_websocket_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void*)bndId);
+    if (entry != NULL) {
+        entry->getCount -= 1;
+    }
+    if (entry != NULL && entry->getCount == 0) {
+        //free entry
+        hashMap_remove(sender->boundedServices.map, (void*)bndId);
+        int rc = sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes);
+        if (rc != 0) {
+            L_ERROR("Error destroying publisher service, serializer not available / cannot get msg serializer map\n");
+        }
+
+        hash_map_iterator_t iter = hashMapIterator_construct(entry->msgEntries);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_websocket_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter);
+            celixThreadMutex_destroy(&msgEntry->metrics.mutex);
+            free(msgEntry);
+        }
+        hashMap_destroy(entry->msgEntries, false, false);
+
+        free(entry);
+    }
+    celixThreadMutex_unlock(&sender->boundedServices.mutex);
+}
+
+pubsub_admin_sender_metrics_t* pubsub_websocketTopicSender_metrics(pubsub_websocket_topic_sender_t *sender) {
+    pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result));
+    snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope);
+    snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->topic);
+    celixThreadMutex_lock(&sender->boundedServices.mutex);
+    size_t count = 0;
+    hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_websocket_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
+        hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
+        while (hashMapIterator_hasNext(&iter2)) {
+            hashMapIterator_nextValue(&iter2);
+            count += 1;
+        }
+    }
+
+    result->msgMetrics = calloc(count, sizeof(*result));
+
+    iter = hashMapIterator_construct(sender->boundedServices.map);
+    int i = 0;
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_websocket_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
+        hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
+        while (hashMapIterator_hasNext(&iter2)) {
+            psa_websocket_send_msg_entry_t *mEntry = hashMapIterator_nextValue(&iter2);
+            celixThreadMutex_lock(&mEntry->metrics.mutex);
+            result->msgMetrics[i].nrOfMessagesSend = mEntry->metrics.nrOfMessagesSend;
+            result->msgMetrics[i].nrOfMessagesSendFailed = mEntry->metrics.nrOfMessagesSendFailed;
+            result->msgMetrics[i].nrOfSerializationErrors = mEntry->metrics.nrOfSerializationErrors;
+            result->msgMetrics[i].averageSerializationTimeInSeconds = mEntry->metrics.averageSerializationTimeInSeconds;
+            result->msgMetrics[i].averageTimeBetweenMessagesInSeconds = mEntry->metrics.averageTimeBetweenMessagesInSeconds;
+            result->msgMetrics[i].lastMessageSend = mEntry->metrics.lastMessageSend;
+            result->msgMetrics[i].bndId = entry->bndId;
+            result->msgMetrics[i].typeId = mEntry->header.type;
+            snprintf(result->msgMetrics[i].typeFqn, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", mEntry->msgSer->msgName);
+            i += 1;
+            celixThreadMutex_unlock(&mEntry->metrics.mutex);
+        }
+    }
+
+    celixThreadMutex_unlock(&sender->boundedServices.mutex);
+    result->nrOfmsgMetrics = (int)count;
+    return result;
+}
+
+static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg) {
+    int status = CELIX_SERVICE_EXCEPTION;
+    psa_websocket_bounded_service_entry_t *bound = handle;
+    pubsub_websocket_topic_sender_t *sender = bound->parent;
+    bool monitor = sender->metricsEnabled;
+    psa_websocket_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void *) (uintptr_t) (msgTypeId));
+
+    //metrics updates
+    struct timespec sendTime;
+    struct timespec serializationStart;
+    struct timespec serializationEnd;
+    //int unknownMessageCountUpdate = 0;
+    int sendErrorUpdate = 0;
+    int serializationErrorUpdate = 0;
+    int sendCountUpdate = 0;
+
+    if (sender->sockConnection != NULL && entry != NULL) {
+        delay_first_send_for_late_joiners(sender);
+
+        if (monitor) {
+            clock_gettime(CLOCK_REALTIME, &serializationStart);
+        }
+
+        void *serializedOutput = NULL;
+        size_t serializedOutputLen = 0;
+        status = entry->msgSer->serialize(entry->msgSer->handle, inMsg, &serializedOutput, &serializedOutputLen);
+
+        if (monitor) {
+            clock_gettime(CLOCK_REALTIME, &serializationEnd);
+        }
+
+        if (status == CELIX_SUCCESS /*ser ok*/) {
+            unsigned char *hdrEncoded = calloc(sizeof(pubsub_websocket_msg_header_t), sizeof(unsigned char));
+
+            celixThreadMutex_lock(&entry->sendLock);
+
+            pubsub_websocket_msg_t *msg = malloc(sizeof(*msg) + sizeof(char[serializedOutputLen]));
+            pubsub_websocket_msg_header_t *msgHdr = &entry->header;
+            if (monitor) {
+                clock_gettime(CLOCK_REALTIME, &sendTime);
+                msgHdr->sendtimeSeconds = (uint64_t) sendTime.tv_sec;
+                msgHdr->sendTimeNanoseconds = (uint64_t) sendTime.tv_nsec;
+                msgHdr->seqNr++;
+            }
+            memcpy(&msg->header, msgHdr, sizeof(pubsub_websocket_msg_header_t));
+
+            msg->payloadSize = (unsigned int) serializedOutputLen;
+            size_t hdr_size = sizeof(msg->header);
+            size_t ps_size = sizeof(msg->payloadSize);
+            size_t bytes_to_write = hdr_size + ps_size + serializedOutputLen;//sizeof(*msg);
+            memcpy(msg->payload, serializedOutput, serializedOutputLen);
+            int bytes_written = mg_websocket_client_write(sender->sockConnection, MG_WEBSOCKET_OPCODE_TEXT, (char *) msg, bytes_to_write);
+
+            celixThreadMutex_unlock(&entry->sendLock);
+            if (bytes_written == (int) bytes_to_write) {
+                sendCountUpdate = 1;
+            } else {
+                sendErrorUpdate = 1;
+                L_WARN("[PSA_WEBSOCKET_TS] Error sending websocket.");
+            }
+
+            free(msg);
+            free(hdrEncoded);
+            free(serializedOutput);
+        } else {
+            serializationErrorUpdate = 1;
+            L_WARN("[PSA_WEBSOCKET_TS] Error serialize message of type %s for scope/topic %s/%s",
+                   entry->msgSer->msgName, sender->scope, sender->topic);
+        }
+    } else if (entry == NULL){
+        //unknownMessageCountUpdate = 1;
+        L_WARN("[PSA_WEBSOCKET_TS] Error sending message with msg type id %i for scope/topic %s/%s", msgTypeId, sender->scope, sender->topic);
+    }
+
+
+    if (monitor && status == CELIX_SUCCESS) {
+        celixThreadMutex_lock(&entry->metrics.mutex);
+
+        long n = entry->metrics.nrOfMessagesSend + entry->metrics.nrOfMessagesSendFailed;
+        double diff = celix_difftime(&serializationStart, &serializationEnd);
+        double average = (entry->metrics.averageSerializationTimeInSeconds * n + diff) / (n+1);
+        entry->metrics.averageSerializationTimeInSeconds = average;
+
+        if (entry->metrics.nrOfMessagesSend > 2) {
+            diff = celix_difftime(&entry->metrics.lastMessageSend, &sendTime);
+            n = entry->metrics.nrOfMessagesSend;
+            average = (entry->metrics.averageTimeBetweenMessagesInSeconds * n + diff) / (n+1);
+            entry->metrics.averageTimeBetweenMessagesInSeconds = average;
+        }
+
+        entry->metrics.lastMessageSend = sendTime;
+        entry->metrics.nrOfMessagesSend += sendCountUpdate;
+        entry->metrics.nrOfMessagesSendFailed += sendErrorUpdate;
+        entry->metrics.nrOfSerializationErrors += serializationErrorUpdate;
+
+        celixThreadMutex_unlock(&entry->metrics.mutex);
+    }
+
+    return status;
+}
+
+static void psa_websocketTopicSender_ready(struct mg_connection *connection, void *handle) {
+    //Connection succeeded so save connection to use for sending the messages
+    pubsub_websocket_topic_sender_t *sender = (pubsub_websocket_topic_sender_t *) handle;
+    sender->sockConnection = connection;
+}
+
+static void psa_websocketTopicSender_close(const struct mg_connection *connection __attribute__((unused)), void *handle) {
+    //Connection closed so reset connection
+    pubsub_websocket_topic_sender_t *sender = (pubsub_websocket_topic_sender_t *) handle;
+    sender->sockConnection = NULL;
+}
+
+static void delay_first_send_for_late_joiners(pubsub_websocket_topic_sender_t *sender) {
+
+    static bool firstSend = true;
+
+    if (firstSend) {
+        L_INFO("PSA_WEBSOCKET_TP: Delaying first send for late joiners...\n");
+        sleep(FIRST_SEND_DELAY_IN_SECONDS);
+        firstSend = false;
+    }
+}
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.h b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.h
new file mode 100644
index 0000000..35229a8
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.h
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef CELIX_PUBSUB_WEBSOCKET_TOPIC_SENDER_H
+#define CELIX_PUBSUB_WEBSOCKET_TOPIC_SENDER_H
+
+#include "celix_bundle_context.h"
+#include "pubsub_admin_metrics.h"
+
+typedef struct pubsub_websocket_topic_sender pubsub_websocket_topic_sender_t;
+
+pubsub_websocket_topic_sender_t* pubsub_websocketTopicSender_create(
+        celix_bundle_context_t *ctx,
+        log_helper_t *logHelper,
+        const char *scope,
+        const char *topic,
+        long serializerSvcId,
+        pubsub_serializer_service_t *ser);
+void pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender);
+
+const char* pubsub_websocketTopicSender_scope(pubsub_websocket_topic_sender_t *sender);
+const char* pubsub_websocketTopicSender_topic(pubsub_websocket_topic_sender_t *sender);
+const char* pubsub_websocketTopicSender_url(pubsub_websocket_topic_sender_t *sender);
+
+long pubsub_websocketTopicSender_serializerSvcId(pubsub_websocket_topic_sender_t *sender);
+
+/**
+ * Returns a array of pubsub_admin_sender_msg_type_metrics_t entries for every msg_type/bundle send with the topic sender.
+ */
+pubsub_admin_sender_metrics_t* pubsub_websocketTopicSender_metrics(pubsub_websocket_topic_sender_t *sender);
+
+#endif //CELIX_PUBSUB_WEBSOCKET_TOPIC_SENDER_H
diff --git a/bundles/pubsub/test/CMakeLists.txt b/bundles/pubsub/test/CMakeLists.txt
index d37eb23..957745b 100644
--- a/bundles/pubsub/test/CMakeLists.txt
+++ b/bundles/pubsub/test/CMakeLists.txt
@@ -172,6 +172,27 @@ add_test(NAME pubsub_tcp_endpoint_tests COMMAND pubsub_tcp_endpoint_tests WORKIN
 SETUP_TARGET_FOR_COVERAGE(pubsub_tcp_endpoint_tests_cov pubsub_tcp_endpoint_tests ${CMAKE_BINARY_DIR}/coverage/pubsub_tcp_endpoint_tests/pubsub_tcp_endpoint_tests ..)
 endif()
 
+add_celix_container(pubsub_websocket_tests
+        USE_CONFIG
+        LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+        DIR ${CMAKE_CURRENT_BINARY_DIR}
+        PROPERTIES
+            LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
+            USE_WEBSOCKETS=true
+            LISTENING_PORTS=8080
+        BUNDLES
+            Celix::pubsub_serializer_json
+            Celix::http_admin
+            Celix::pubsub_topology_manager
+            Celix::pubsub_admin_websocket
+            pubsub_sut
+            pubsub_tst
+)
+target_link_libraries(pubsub_websocket_tests PRIVATE Celix::pubsub_api ${CPPUTEST_LIBRARIES} ${JANSSON_LIBRARIES} Celix::dfi)
+target_include_directories(pubsub_websocket_tests PRIVATE ${CPPUTEST_INCLUDE_DIR})
+add_test(NAME pubsub_websocket_tests COMMAND pubsub_websocket_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_websocket_tests,CONTAINER_LOC>)
+SETUP_TARGET_FOR_COVERAGE(pubsub_websocket_tests_cov pubsub_websocket_tests ${CMAKE_BINARY_DIR}/coverage/pubsub_websocket_tests/pubsub_websocket_tests ..)
+
 if (BUILD_PUBSUB_PSA_ZMQ)
     add_celix_container(pubsub_zmq_tests
             USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
diff --git a/bundles/pubsub/test/meta_data/ping.properties b/bundles/pubsub/test/meta_data/ping.properties
index b73435d..2e13c17 100644
--- a/bundles/pubsub/test/meta_data/ping.properties
+++ b/bundles/pubsub/test/meta_data/ping.properties
@@ -20,6 +20,7 @@ tcp.static.bind.url=tcp://127.0.0.1:9000
 tcp.static.connect.urls=tcp://127.0.0.1:9000
 udpmc.static.bind.port=50678
 udpmc.static.connect.socket_addresses=224.100.0.1:50678
+websocket.static.connect.socket_addresses=127.0.0.1:8080
 
 #note only effective if run as root
 thread.realtime.shed=SCHED_FIFO