You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/10/12 09:03:35 UTC
[18/34] celix git commit: CELIX-454: Some bug and mem leak fixes for
the PSA refactoring
CELIX-454: Some bug and mem leak fixes for the PSA refactoring
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/b635e274
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/b635e274
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/b635e274
Branch: refs/heads/develop
Commit: b635e27490fedea9197e89121407f93644b46df5
Parents: f1dfdbd
Author: Pepijn Noltes <pe...@gmail.com>
Authored: Thu Sep 27 21:20:22 2018 +0200
Committer: Pepijn Noltes <pe...@gmail.com>
Committed: Thu Sep 27 21:20:22 2018 +0200
----------------------------------------------------------------------
.../pubsub_admin_udp_mc/src/psa_activator.c | 16 ++
.../src/pubsub_udpmc_admin.c | 269 +++++++++++++------
.../src/pubsub_udpmc_admin.h | 3 +
.../src/pubsub_udpmc_topic_receiver.c | 111 ++------
.../src/pubsub_udpmc_topic_receiver.h | 24 +-
.../src/pubsub_udpmc_topic_sender.c | 122 +++------
.../src/pubsub_udpmc_topic_sender.h | 24 +-
.../pubsub/pubsub_admin_zmq/src/psa_activator.c | 15 ++
.../pubsub_admin_zmq/src/pubsub_zmq_admin.c | 189 ++++++++++---
.../pubsub_admin_zmq/src/pubsub_zmq_admin.h | 3 +
.../src/pubsub_zmq_topic_receiver.c | 158 +++--------
.../src/pubsub_zmq_topic_receiver.h | 21 +-
.../src/pubsub_zmq_topic_sender.c | 114 +++-----
.../src/pubsub_zmq_topic_sender.h | 5 +-
.../src/pubsub_topology_manager.c | 111 ++++----
.../src/pubsub_topology_manager.h | 5 -
16 files changed, 602 insertions(+), 588 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
index 682efc5..ebe39db 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
@@ -33,6 +33,8 @@ typedef struct psa_udpmc_activator {
pubsub_udpmc_admin_t *admin;
+ long serializersTrackerId;
+
pubsub_admin_service_t adminService;
long adminSvcId;
@@ -43,6 +45,8 @@ typedef struct psa_udpmc_activator {
int psa_udpmc_start(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) {
act->adminSvcId = -1L;
act->cmdSvcId = -1L;
+ act->serializersTrackerId = -1L;
+
logHelper_create(ctx, &act->logHelper);
logHelper_start(act->logHelper);
@@ -50,6 +54,17 @@ int psa_udpmc_start(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) {
act->admin = pubsub_udpmcAdmin_create(ctx, act->logHelper);
celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
+ //track serializers
+ if (status == CELIX_SUCCESS) {
+ celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+ opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
+ opts.filter.ignoreServiceLanguage = true;
+ opts.callbackHandle = act->admin;
+ opts.addWithProperties = pubsub_udpmcAdmin_addSerializerSvc;
+ opts.removeWithProperties = pubsub_udpmcAdmin_removeSerializerSvc;
+ act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+ }
+
//register pubsub admin service
if (status == CELIX_SUCCESS) {
pubsub_admin_service_t *psaSvc = &act->adminService;
@@ -87,6 +102,7 @@ int psa_udpmc_start(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) {
int psa_udpmc_stop(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) {
celix_bundleContext_unregisterService(ctx, act->adminSvcId);
celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
+ celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
pubsub_udpmcAdmin_destroy(act->admin);
logHelper_stop(act->logHelper);
http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
index 1935c15..aea6de2 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
@@ -24,6 +24,7 @@
#include <netdb.h>
#include <ifaddrs.h>
#include <pubsub_endpoint.h>
+#include <pubsub_serializer.h>
#include "pubsub_utils.h"
#include "pubsub_udpmc_admin.h"
@@ -35,13 +36,13 @@
#define PUBSUB_UDPMC_SOCKET_ADDRESS_KEY "udpmc.socket_address"
#define PUBSUB_UDPMC_SOCKET_PORT_KEY "udpmc.socker_port"
-#define LOG_DEBUG(...) \
+#define L_DEBUG(...) \
logHelper_log(psa->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
-#define LOG_INFO(...) \
+#define L_INFO(...) \
logHelper_log(psa->log, OSGI_LOGSERVICE_INFO, __VA_ARGS__);
-#define LOG_WARN(...) \
+#define L_WARN(...) \
logHelper_log(psa->log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__);
-#define LOG_ERROR(...) \
+#define L_ERROR(...) \
logHelper_log(psa->log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
struct pubsub_udpmc_admin {
@@ -57,6 +58,11 @@ struct pubsub_udpmc_admin {
const char *fwUUID;
struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map;
+ } serializers;
+
+ struct {
celix_thread_mutex_t mutex;
hash_map_t *map; //key = scope:topic key, value = pubsub_udpmc_topic_sender_t
} topicSenders;
@@ -73,9 +79,15 @@ struct pubsub_udpmc_admin {
};
+typedef struct psa_zmq_serializer_entry {
+ const char *serType;
+ long svcId;
+ pubsub_serializer_service_t *svc;
+} psa_udpmc_serializer_entry_t;
+
static celix_status_t udpmc_getIpAddress(const char* interface, char** ip);
-static celix_status_t pubsub_udpmcAdmin_connectEndpointToReceiver(pubsub_udpmc_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint);
-static celix_status_t pubsub_udpmcAdmin_disconnectEndpointFromReceiver(pubsub_udpmc_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint);
+static celix_status_t pubsub_udpmcAdmin_connectEndpointToReceiver(pubsub_udpmc_admin_t* psa, pubsub_udpmc_topic_receiver_t *receiver, const celix_properties_t *endpoint);
+static celix_status_t pubsub_udpmcAdmin_disconnectEndpointFromReceiver(pubsub_udpmc_admin_t* psa, pubsub_udpmc_topic_receiver_t *receiver, const celix_properties_t *endpoint);
pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper) {
@@ -102,9 +114,9 @@ pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_
PUBSUB_UDPMC_MULTICAST_IP_PREFIX_DEFAULT);
const char *interface = celix_bundleContext_getProperty(ctx, PUBSUB_UDPMC_ITF_KEY, NULL);
if (udpmc_getIpAddress(interface, &if_ip) != CELIX_SUCCESS) {
- LOG_WARN("[PSA_UDPMC] Could not retrieve IP address for interface %s", interface);
+ L_WARN("[PSA_UDPMC] Could not retrieve IP address for interface %s", interface);
} else if (psa->verbose) {
- LOG_INFO("[PSA_UDPMC] Using IP address %s", if_ip);
+ L_INFO("[PSA_UDPMC] Using IP address %s", if_ip);
}
if(if_ip && sscanf(if_ip, "%i.%i.%i.%i", &b0, &b1, &b2, &b3) != 4) {
@@ -117,19 +129,19 @@ pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_
sendSocket = socket(AF_INET, SOCK_DGRAM, 0);
if(sendSocket == -1) {
- LOG_ERROR("[PSA_UDPMC] Error creating socket: %s", strerror(errno));
+ L_ERROR("[PSA_UDPMC] Error creating socket: %s", strerror(errno));
} else {
char loop = 1;
int rc = setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop));
if(rc != 0) {
- LOG_ERROR("[PSA_UDPMC] Error setsockopt(IP_MULTICAST_LOOP): %s", strerror(errno));
+ L_ERROR("[PSA_UDPMC] Error setsockopt(IP_MULTICAST_LOOP): %s", strerror(errno));
}
if (rc == 0) {
struct in_addr multicast_interface;
inet_aton(if_ip, &multicast_interface);
rc = setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_IF, &multicast_interface, sizeof(multicast_interface));
if (rc != 0) {
- LOG_ERROR("[PSA_UDPMC] Error setsockopt(IP_MULTICAST_IF): %s", strerror(errno));
+ L_ERROR("[PSA_UDPMC] Error setsockopt(IP_MULTICAST_IF): %s", strerror(errno));
}
}
if (rc == 0) {
@@ -144,7 +156,7 @@ pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_
}
if (psa->verbose) {
- LOG_INFO("[PSA_UDPMC] Using %s as interface for multicast communication", psa->ifIpAddress);
+ L_INFO("[PSA_UDPMC] Using %s as interface for multicast communication", psa->ifIpAddress);
}
@@ -154,13 +166,16 @@ pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_
psa->mcIpAddress = strdup(PUBSUB_UDPMC_MC_IP_DEFAULT);
}
if (psa->verbose) {
- LOG_INFO("[PSA_UDPMC] Using %s for service annunciation", psa->mcIpAddress);
+ L_INFO("[PSA_UDPMC] Using %s for service annunciation", psa->mcIpAddress);
}
psa->defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_UDPMC_DEFAULT_SCORE_KEY, PSA_UDPMC_DEFAULT_SCORE);
psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_UDPMC_QOS_SAMPLE_SCORE_KEY, PSA_UDPMC_DEFAULT_QOS_SAMPLE_SCORE);
psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_UDPMC_QOS_CONTROL_SCORE_KEY, PSA_UDPMC_DEFAULT_QOS_CONTROL_SCORE);
+ celixThreadMutex_create(&psa->serializers.mutex, NULL);
+ psa->serializers.map = hashMap_create(NULL, NULL, NULL, NULL);
+
celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
@@ -183,7 +198,7 @@ void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa) {
celixThreadMutex_lock(&psa->topicSenders.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
while (hashMapIterator_hasNext(&iter)) {
- pubsub_updmc_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
+ pubsub_udpmc_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
pubsub_udpmcTopicSender_destroy(sender);
}
celixThreadMutex_unlock(&psa->topicSenders.mutex);
@@ -191,7 +206,7 @@ void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa) {
celixThreadMutex_lock(&psa->topicReceivers.mutex);
iter = hashMapIterator_construct(psa->topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
- pubsub_updmc_topic_receiver_t *recv = hashMapIterator_nextValue(&iter);
+ pubsub_udpmc_topic_receiver_t *recv = hashMapIterator_nextValue(&iter);
pubsub_udpmcTopicReceiver_destroy(recv);
}
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
@@ -204,6 +219,14 @@ void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa) {
}
celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+ celixThreadMutex_lock(&psa->serializers.mutex);
+ iter = hashMapIterator_construct(psa->serializers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_udpmc_serializer_entry_t *entry = hashMapIterator_nextValue(&iter);
+ free(entry);
+ }
+ celixThreadMutex_unlock(&psa->serializers.mutex);
+
celixThreadMutex_destroy(&psa->topicSenders.mutex);
hashMap_destroy(psa->topicSenders.map, true, false);
@@ -213,6 +236,9 @@ void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa) {
celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
hashMap_destroy(psa->discoveredEndpoints.map, true, false);
+ celixThreadMutex_destroy(&psa->serializers.mutex);
+ hashMap_destroy(psa->serializers.map, false, false);
+
free(psa->mcIpAddress);
free(psa->ifIpAddress);
free(psa);
@@ -220,7 +246,7 @@ void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa) {
celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *outScore, long *outSerializerSvcId) {
pubsub_udpmc_admin_t *psa = handle;
- LOG_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchPublisher");
+ L_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchPublisher");
celix_status_t status = CELIX_SUCCESS;
double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_UDPMC_ADMIN_TYPE,
psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
@@ -231,7 +257,7 @@ celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterB
celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *outScore, long *outSerializerSvcId) {
pubsub_udpmc_admin_t *psa = handle;
- LOG_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchSubscriber");
+ L_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchSubscriber");
celix_status_t status = CELIX_SUCCESS;
double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_UDPMC_ADMIN_TYPE,
psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
@@ -243,7 +269,7 @@ celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderB
celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *outMatch) {
pubsub_udpmc_admin_t *psa = handle;
- LOG_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchEndpoint");
+ L_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchEndpoint");
celix_status_t status = CELIX_SUCCESS;
bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_UDPMC_ADMIN_TYPE, NULL);
if (outMatch != NULL) {
@@ -264,37 +290,41 @@ celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scop
celix_properties_t *newEndpoint = NULL;
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+ celixThreadMutex_lock(&psa->serializers.mutex);
celixThreadMutex_lock(&psa->topicSenders.mutex);
- pubsub_updmc_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key);
+ pubsub_udpmc_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key);
if (sender == NULL) {
- sender = pubsub_udpmcTopicSender_create(psa->ctx, scope, topic, serializerSvcId, psa->sendSocket, psa->mcIpAddress);
- const char *psaType = pubsub_udpmcTopicSender_psaType(sender);
- const char *serType = pubsub_udpmcTopicSender_serializerType(sender);
- newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
- PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, NULL);
- celix_properties_set(newEndpoint, PUBSUB_UDPMC_SOCKET_ADDRESS_KEY, pubsub_udpmcTopicSender_socketAddress(sender));
- celix_properties_setLong(newEndpoint, PUBSUB_UDPMC_SOCKET_PORT_KEY, pubsub_udpmcTopicSender_socketPort(sender));
- //if available also set container name
- const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
- if (cn != NULL) {
- celix_properties_set(newEndpoint, "container_name", cn);
+ psa_udpmc_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
+ if (serEntry != NULL) {
+ sender = pubsub_udpmcTopicSender_create(psa->ctx, scope, topic, serializerSvcId, serEntry->svc, psa->sendSocket, psa->mcIpAddress);
}
- bool valid = pubsubEndpoint_isValid(newEndpoint, true, true);
- if (!valid) {
- LOG_ERROR("[PSA UDPMC] Error creating a valid TopicSender. Endpoints are not valid");
- celix_properties_destroy(newEndpoint);
- pubsub_udpmcTopicSender_destroy(sender);
- free(key);
- } else {
+ if (sender != NULL) {
+ const char *psaType = PSA_UDPMC_PUBSUB_ADMIN_TYPE;
+ const char *serType = serEntry->serType;
+ newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
+ PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, NULL);
+ celix_properties_set(newEndpoint, PUBSUB_UDPMC_SOCKET_ADDRESS_KEY, pubsub_udpmcTopicSender_socketAddress(sender));
+ celix_properties_setLong(newEndpoint, PUBSUB_UDPMC_SOCKET_PORT_KEY, pubsub_udpmcTopicSender_socketPort(sender));
+ //if available also set container name
+ const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
+ if (cn != NULL) {
+ celix_properties_set(newEndpoint, "container_name", cn);
+ }
hashMap_put(psa->topicSenders.map, key, sender);
- //TODO connect endpoints to sender, NOTE is this needed for a udpmc topic sender?
+ } else {
+ free(key);
+ L_ERROR("[PSA UDPMC] Error creating a valid TopicSender. Endpoints are not valid");
}
} else {
free(key);
- LOG_ERROR("[PSA_UDPMC] Cannot setup already existing TopicSender for scope/topic %s/%s!", scope, topic);
+ L_ERROR("[PSA_UDPMC] Cannot setup already existing TopicSender for scope/topic %s/%s!", scope, topic);
}
celixThreadMutex_unlock(&psa->topicSenders.mutex);
+ celixThreadMutex_unlock(&psa->serializers.mutex);
+ if (sender != NULL && newEndpoint != NULL) {
+ //TODO connect endpoints to sender, NOTE is this needed for a udpmc topic sender?
+ }
if (newEndpoint != NULL && outPublisherEndpoint != NULL) {
*outPublisherEndpoint = newEndpoint;
@@ -315,12 +345,12 @@ celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *s
hash_map_entry_t *entry = hashMap_getEntry(psa->topicSenders.map, key);
if (entry != NULL) {
char *mapKey = hashMapEntry_getKey(entry);
- pubsub_updmc_topic_sender_t *sender = hashMap_remove(psa->topicSenders.map, key);
+ pubsub_udpmc_topic_sender_t *sender = hashMap_remove(psa->topicSenders.map, key);
free(mapKey);
//TODO disconnect endpoints to sender. note is this needed for a udpmc topic sender?
pubsub_udpmcTopicSender_destroy(sender);
} else {
- LOG_ERROR("[PSA UDPMC] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic);
+ L_ERROR("[PSA UDPMC] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic);
}
celixThreadMutex_unlock(&psa->topicSenders.mutex);
free(key);
@@ -334,43 +364,45 @@ celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *sc
celix_properties_t *newEndpoint = NULL;
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+ celixThreadMutex_lock(&psa->serializers.mutex);
celixThreadMutex_lock(&psa->topicReceivers.mutex);
- pubsub_updmc_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
+ pubsub_udpmc_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
if (receiver == NULL) {
- receiver = pubsub_udpmcTopicReceiver_create(psa->ctx, scope, topic, psa->ifIpAddress, serializerSvcId);
- const char *psaType = pubsub_udpmcTopicReceiver_psaType(receiver);
- const char *serType = pubsub_udpmcTopicReceiver_serializerType(receiver);
- newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
- PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL);
-
- //if available also set container name
- const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
- if (cn != NULL) {
- celix_properties_set(newEndpoint, "container_name", cn);
+ psa_udpmc_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
+ if (serEntry != NULL) {
+ receiver = pubsub_udpmcTopicReceiver_create(psa->ctx, scope, topic, psa->ifIpAddress, serializerSvcId, serEntry->svc);
}
- bool valid = pubsubEndpoint_isValid(newEndpoint, true, true);
- if (!valid) {
- LOG_ERROR("[PSA UDPMC] Error creating a valid TopicReceiver. Endpoints are not valid");
- celix_properties_destroy(newEndpoint);
- pubsub_udpmcTopicReceiver_destroy(receiver);
- free(key);
- } else {
- hashMap_put(psa->topicReceivers.map, key, receiver);
-
- celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
- while (hashMapIterator_hasNext(&iter)) {
- celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
- pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+ if (receiver != NULL) {
+ const char *psaType = PSA_UDPMC_PUBSUB_ADMIN_TYPE;
+ const char *serType = serEntry->serType;
+ newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
+ PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL);
+ //if available also set container name
+ const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
+ if (cn != NULL) {
+ celix_properties_set(newEndpoint, "container_name", cn);
}
- celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+ hashMap_put(psa->topicReceivers.map, key, receiver);
+ } else {
+ L_ERROR("[PSA UDPMC] Error creating a valid TopicReceiver. Endpoints are not valid");
+ free(key);
}
} else {
free(key);
- LOG_ERROR("[PSA_UDPMC] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic);
+ L_ERROR("[PSA_UDPMC] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic);
}
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
-
+ celixThreadMutex_unlock(&psa->serializers.mutex);
+
+ if (receiver != NULL && newEndpoint != NULL) {
+ celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
+ pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+ }
+ celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+ }
if (newEndpoint != NULL && outSubscriberEndpoint != NULL) {
*outSubscriberEndpoint = newEndpoint;
@@ -389,7 +421,7 @@ celix_status_t pubsub_udpmcAdmin_teardownTopicReceiver(void *handle, const char
free(key);
if (entry != NULL) {
char *receiverKey = hashMapEntry_getKey(entry);
- pubsub_updmc_topic_receiver_t *receiver = hashMapEntry_getValue(entry);
+ pubsub_udpmc_topic_receiver_t *receiver = hashMapEntry_getValue(entry);
hashMap_remove(psa->topicReceivers.map, receiverKey);
free(receiverKey);
@@ -401,7 +433,7 @@ celix_status_t pubsub_udpmcAdmin_teardownTopicReceiver(void *handle, const char
return status;
}
-static celix_status_t pubsub_udpmcAdmin_connectEndpointToReceiver(pubsub_udpmc_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
+static celix_status_t pubsub_udpmcAdmin_connectEndpointToReceiver(pubsub_udpmc_admin_t* psa, pubsub_udpmc_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
//note can be called with discoveredEndpoint.mutex lock
celix_status_t status = CELIX_SUCCESS;
@@ -434,7 +466,7 @@ celix_status_t pubsub_udpmcAdmin_addEndpoint(void *handle, const celix_propertie
celixThreadMutex_lock(&psa->topicReceivers.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
- pubsub_updmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+ pubsub_udpmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
}
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
@@ -450,7 +482,7 @@ celix_status_t pubsub_udpmcAdmin_addEndpoint(void *handle, const celix_propertie
}
-static celix_status_t pubsub_udpmcAdmin_disconnectEndpointFromReceiver(pubsub_udpmc_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
+static celix_status_t pubsub_udpmcAdmin_disconnectEndpointFromReceiver(pubsub_udpmc_admin_t* psa, pubsub_udpmc_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
//note can be called with discoveredEndpoint.mutex lock
celix_status_t status = CELIX_SUCCESS;
@@ -483,7 +515,7 @@ celix_status_t pubsub_udpmcAdmin_removeEndpoint(void *handle, const celix_proper
celixThreadMutex_lock(&psa->topicReceivers.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
- pubsub_updmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+ pubsub_udpmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
pubsub_udpmcAdmin_disconnectEndpointFromReceiver(psa, receiver, endpoint);
}
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
@@ -507,37 +539,41 @@ celix_status_t pubsub_udpmcAdmin_executeCommand(void *handle, char *commandLine
fprintf(out, "\n");
fprintf(out, "Topic Senders:\n");
+ celixThreadMutex_lock(&psa->serializers.mutex);
celixThreadMutex_lock(&psa->topicSenders.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
while (hashMapIterator_hasNext(&iter)) {
- pubsub_updmc_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
- const char *psaType = pubsub_udpmcTopicSender_psaType(sender);
- const char *serType = pubsub_udpmcTopicSender_serializerType(sender);
+ pubsub_udpmc_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
+ long serSvcId = pubsub_udpmcTopicSender_serializerSvcId(sender);
+ psa_udpmc_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serSvcId);
+ const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
const char *scope = pubsub_udpmcTopicSender_scope(sender);
const char *topic = pubsub_udpmcTopicSender_topic(sender);
const char *sockAddr = pubsub_udpmcTopicSender_socketAddress(sender);
fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
- fprintf(out, " |- psa type = %s\n", psaType);
fprintf(out, " |- serializer type = %s\n", serType);
fprintf(out, " |- socket address = %s\n", sockAddr);
}
celixThreadMutex_unlock(&psa->topicSenders.mutex);
+ celixThreadMutex_unlock(&psa->serializers.mutex);
fprintf(out, "\n");
fprintf(out, "\nTopic Receivers:\n");
+ celixThreadMutex_lock(&psa->serializers.mutex);
celixThreadMutex_lock(&psa->topicReceivers.mutex);
iter = hashMapIterator_construct(psa->topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
- pubsub_updmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
- const char *psaType = pubsub_udpmcTopicReceiver_psaType(receiver);
- const char *serType = pubsub_udpmcTopicReceiver_serializerType(receiver);
+ pubsub_udpmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+ long serSvcId = pubsub_udpmcTopicReceiver_serializerSvcId(receiver);
+ psa_udpmc_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serSvcId);
+ const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
const char *scope = pubsub_udpmcTopicReceiver_scope(receiver);
const char *topic = pubsub_udpmcTopicReceiver_topic(receiver);
fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
- fprintf(out, " |- psa type = %s\n", psaType);
fprintf(out, " |- serializer type = %s\n", serType);
}
- celixThreadMutex_unlock(&psa->topicSenders.mutex);
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+ celixThreadMutex_unlock(&psa->serializers.mutex);
fprintf(out, "\n");
//TODO topic receivers/senders connection count
@@ -545,6 +581,73 @@ celix_status_t pubsub_udpmcAdmin_executeCommand(void *handle, char *commandLine
return status;
}
+void pubsub_udpmcAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
+ pubsub_udpmc_admin_t *psa = handle;
+
+ const char *serType = celix_properties_get(props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+
+ if (serType == NULL) {
+ L_INFO("[PSA_ZMQ] Ignoring serializer service without %s property", PUBSUB_SERIALIZER_TYPE_KEY);
+ return;
+ }
+
+ celixThreadMutex_lock(&psa->serializers.mutex);
+ psa_udpmc_serializer_entry_t *entry = hashMap_get(psa->serializers.map, (void*)svcId);
+ if (entry == NULL) {
+ entry = calloc(1, sizeof(*entry));
+ entry->serType = serType;
+ entry->svcId = svcId;
+ entry->svc = svc;
+ hashMap_put(psa->serializers.map, (void*)svcId, entry);
+ }
+ celixThreadMutex_unlock(&psa->serializers.mutex);
+}
+
+void pubsub_udpmcAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
+ pubsub_udpmc_admin_t *psa = handle;
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+
+ //remove serializer
+ // 1) First find entry and
+ // 2) loop and destroy all topic sender using the serializer and
+ // 3) loop and destroy all topic receivers using the serializer
+ // Note that it is the responsibility of the topology manager to create new topic senders/receivers
+
+ celixThreadMutex_lock(&psa->serializers.mutex);
+ psa_udpmc_serializer_entry_t *entry = hashMap_remove(psa->serializers.map, (void*)svcId);
+ if (entry != NULL) {
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+ pubsub_udpmc_topic_sender_t *sender = hashMapEntry_getValue(senderEntry);
+ if (sender != NULL && entry->svcId == pubsub_udpmcTopicSender_serializerSvcId(sender)) {
+ char *key = hashMapEntry_getKey(senderEntry);
+ hashMapIterator_remove(&iter);
+ pubsub_udpmcTopicSender_destroy(sender);
+ free(key);
+ }
+ }
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ iter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+ pubsub_udpmc_topic_receiver_t *receiver = hashMapEntry_getValue(senderEntry);
+ if (receiver != NULL && entry->svcId == pubsub_udpmcTopicReceiver_serializerSvcId(receiver)) {
+ char *key = hashMapEntry_getKey(senderEntry);
+ hashMapIterator_remove(&iter);
+ pubsub_udpmcTopicReceiver_destroy(receiver);
+ free(key);
+ }
+ }
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+ }
+ celixThreadMutex_unlock(&psa->serializers.mutex);
+}
+
#ifndef ANDROID
static celix_status_t udpmc_getIpAddress(const char* interface, char** ip) {
celix_status_t status = CELIX_BUNDLE_EXCEPTION;
http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
index e050384..02ebd44 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
@@ -50,6 +50,9 @@ celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *s
celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint);
celix_status_t pubsub_udpmcAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
+void pubsub_udpmcAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
+void pubsub_udpmcAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
+
celix_status_t pubsub_udpmcAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint);
celix_status_t pubsub_udpmcAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint);
http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
index f3e320a..24dfc08 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
@@ -36,23 +36,16 @@
#define UDP_BUFFER_SIZE 65535
#define MAX_UDP_SESSIONS 16
-struct pubsub_updmc_topic_receiver {
+struct pubsub_udpmc_topic_receiver {
celix_bundle_context_t *ctx;
+ long serializerSvcId;
+ pubsub_serializer_service_t *serializer;
char *scope;
char *topic;
char* ifIpAddress;
largeUdp_pt largeUdpHandle;
int topicEpollFd; // EPOLL filedescriptor where the sockets are registered.
- //serialiser svc
- long serializerTrackerId;
- struct {
-
- celix_thread_mutex_t mutex; //protect svc
- pubsub_serializer_service_t *svc;
- const celix_properties_t *props;
- } serializer;
-
struct {
celix_thread_t thread;
celix_thread_mutex_t mutex;
@@ -92,20 +85,22 @@ typedef struct pubsub_msg{
unsigned int payloadSize;
} pubsub_msg_t;
-static void pubsub_udpmcTopicReceiver_setSerializer(void *handle, void *svc, const celix_properties_t *props);
static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
static void pubsub_udpmcTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
-static void psa_udpmc_processMsg(pubsub_updmc_topic_receiver_t *receiver, pubsub_udp_msg_t *msg);
+static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub_udp_msg_t *msg);
static void* psa_udpmc_recvThread(void * data);
-pubsub_updmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_context_t *ctx,
+pubsub_udpmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_context_t *ctx,
const char *scope,
const char *topic,
const char *ifIP,
- long serializerSvcId) {
- pubsub_updmc_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
+ long serializerSvcId,
+ pubsub_serializer_service_t *serializer) {
+ pubsub_udpmc_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
receiver->ctx = ctx;
+ receiver->serializerSvcId = serializerSvcId;
+ receiver->serializer = serializer;
receiver->scope = strndup(scope, 1024 * 1024);
receiver->topic = strndup(topic, 1024 * 1024);
receiver->ifIpAddress = strndup(ifIP, 1024 * 1024);
@@ -114,7 +109,6 @@ pubsub_updmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_con
receiver->topicEpollFd = epoll_create1(0);
- celixThreadMutex_create(&receiver->serializer.mutex, NULL);
celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
celixThreadMutex_create(&receiver->recvThread.mutex, NULL);
@@ -122,20 +116,6 @@ pubsub_updmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_con
receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
- //track serializer svc based on the provided serializerSvcId
- {
- char filter[64];
- snprintf(filter, 64, "(service.id=%li)", serializerSvcId);
-
- celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
- opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
- opts.filter.filter = filter;
- opts.filter.ignoreServiceLanguage = true;
- opts.callbackHandle = receiver;
- opts.setWithProperties = pubsub_udpmcTopicReceiver_setSerializer;
- receiver->serializerTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- }
-
//track subscribers
{
int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
@@ -157,9 +137,8 @@ pubsub_updmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_con
return receiver;
}
-void pubsub_udpmcTopicReceiver_destroy(pubsub_updmc_topic_receiver_t *receiver) {
+void pubsub_udpmcTopicReceiver_destroy(pubsub_udpmc_topic_receiver_t *receiver) {
if (receiver != NULL) {
- celix_bundleContext_stopTracker(receiver->ctx, receiver->serializerTrackerId);
celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId);
celixThreadMutex_lock(&receiver->recvThread.mutex);
@@ -167,7 +146,6 @@ void pubsub_udpmcTopicReceiver_destroy(pubsub_updmc_topic_receiver_t *receiver)
celixThreadMutex_unlock(&receiver->recvThread.mutex);
celixThread_join(receiver->recvThread.thread, NULL);
- celixThreadMutex_destroy(&receiver->serializer.mutex);
celixThreadMutex_destroy(&receiver->subscribers.mutex);
celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
celixThreadMutex_destroy(&receiver->recvThread.mutex);
@@ -180,29 +158,19 @@ void pubsub_udpmcTopicReceiver_destroy(pubsub_updmc_topic_receiver_t *receiver)
free(receiver);
}
-const char* pubsub_udpmcTopicReceiver_psaType(pubsub_updmc_topic_receiver_t *receiver) {
- return PSA_UDPMC_PUBSUB_ADMIN_TYPE;
-}
-
-const char* pubsub_udpmcTopicReceiver_serializerType(pubsub_updmc_topic_receiver_t *receiver) {
- const char *result = NULL;
- celixThreadMutex_lock(&receiver->serializer.mutex);
- if (receiver->serializer.props != NULL) {
- result = celix_properties_get(receiver->serializer.props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
- }
- celixThreadMutex_unlock(&receiver->serializer.mutex);
- return result;
-}
-
-const char* pubsub_udpmcTopicReceiver_scope(pubsub_updmc_topic_receiver_t *receiver) {
+const char* pubsub_udpmcTopicReceiver_scope(pubsub_udpmc_topic_receiver_t *receiver) {
return receiver->scope;
}
-const char* pubsub_udpmcTopicReceiver_topic(pubsub_updmc_topic_receiver_t *receiver) {
+const char* pubsub_udpmcTopicReceiver_topic(pubsub_udpmc_topic_receiver_t *receiver) {
return receiver->topic;
}
+long pubsub_udpmcTopicReceiver_serializerSvcId(pubsub_udpmc_topic_receiver_t *receiver) {
+ return receiver->serializerSvcId;
+}
+
void pubsub_udpmcTopicReceiver_connectTo(
- pubsub_updmc_topic_receiver_t *receiver,
+ pubsub_udpmc_topic_receiver_t *receiver,
const char *socketAddress,
long socketPort) {
printf("[PSA UDPMC] TopicReceiver %s/%s connect to socket address = %s:%li\n", receiver->scope, receiver->topic, socketAddress, socketPort);
@@ -261,7 +229,7 @@ void pubsub_udpmcTopicReceiver_connectTo(
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
}
-void pubsub_udpmcTopicReceiver_disconnectFrom(pubsub_updmc_topic_receiver_t *receiver, const char *socketAddress, long socketPort) {
+void pubsub_udpmcTopicReceiver_disconnectFrom(pubsub_udpmc_topic_receiver_t *receiver, const char *socketAddress, long socketPort) {
printf("[PSA UDPMC] TopicReceiver %s/%s disconnect from socket address = %s:%li\n", receiver->scope, receiver->topic, socketAddress, socketPort);
int len = snprintf(NULL, 0, "%s:%li", socketAddress, socketPort);
@@ -287,23 +255,8 @@ void pubsub_udpmcTopicReceiver_disconnectFrom(pubsub_updmc_topic_receiver_t *rec
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
}
-
-static void pubsub_udpmcTopicReceiver_setSerializer(void *handle, void *svc, const celix_properties_t *props) {
- pubsub_updmc_topic_receiver_t *receiver = handle;
- pubsub_serializer_service_t *ser = svc;
-
- if (ser == NULL) {
- //TODO -> no serializer -> remove all publishers
- }
-
- celixThreadMutex_lock(&receiver->serializer.mutex);
- receiver->serializer.svc = ser;
- receiver->serializer.props = props;
- celixThreadMutex_unlock(&receiver->serializer.mutex);
-}
-
static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
- pubsub_updmc_topic_receiver_t *receiver = handle;
+ pubsub_udpmc_topic_receiver_t *receiver = handle;
long bndId = celix_bundle_getId(bnd);
const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
@@ -322,22 +275,19 @@ static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, con
entry->usageCount = 1;
entry->svc = svc;
- celixThreadMutex_lock(&receiver->serializer.mutex);
- if (receiver->serializer.svc != NULL) {
- receiver->serializer.svc->createSerializerMap(receiver->serializer.svc->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
+ int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
+ if (rc == 0) {
+ hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
} else {
+ free(entry);
fprintf(stderr, "Cannot find serializer for TopicReceiver %s/%s", receiver->scope, receiver->topic);
}
- celixThreadMutex_unlock(&receiver->serializer.mutex);
-
- hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
-
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
static void pubsub_udpmcTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
- pubsub_updmc_topic_receiver_t *receiver = handle;
+ pubsub_udpmc_topic_receiver_t *receiver = handle;
long bndId = celix_bundle_getId(bnd);
@@ -349,20 +299,17 @@ static void pubsub_udpmcTopicReceiver_removeSubscriber(void *handle, void *svc,
if (entry != NULL && entry->usageCount <= 0) {
//remove entry
hashMap_remove(receiver->subscribers.map, (void*)bndId);
- celixThreadMutex_lock(&receiver->serializer.mutex);
- if (receiver->serializer.svc != NULL) {
- receiver->serializer.svc->destroySerializerMap(receiver->serializer.svc->handle, entry->msgTypes);
- } else {
+ int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+ if (rc != 0) {
fprintf(stderr, "Cannot find serializer for TopicReceiver %s/%s", receiver->scope, receiver->topic);
}
- celixThreadMutex_unlock(&receiver->serializer.mutex);
free(entry);
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
static void* psa_udpmc_recvThread(void * data) {
- pubsub_updmc_topic_receiver_t *receiver = data;
+ pubsub_udpmc_topic_receiver_t *receiver = data;
struct epoll_event events[MAX_EPOLL_EVENTS];
@@ -397,7 +344,7 @@ static void* psa_udpmc_recvThread(void * data) {
return NULL;
}
-static void psa_udpmc_processMsg(pubsub_updmc_topic_receiver_t *receiver, pubsub_udp_msg_t *msg) {
+static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub_udp_msg_t *msg) {
celixThreadMutex_lock(&receiver->subscribers.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
while (hashMapIterator_hasNext(&iter)) {
http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h
index ee2c113..2610489 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h
@@ -20,26 +20,28 @@
#define CELIX_PUBSUB_UDPMC_TOPIC_RECEIVER_H
#include "celix_bundle_context.h"
+#include "pubsub_serializer.h"
-typedef struct pubsub_updmc_topic_receiver pubsub_updmc_topic_receiver_t;
+typedef struct pubsub_udpmc_topic_receiver pubsub_udpmc_topic_receiver_t;
-pubsub_updmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_context_t *ctx,
+pubsub_udpmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_context_t *ctx,
const char *scope,
const char *topic,
const char *ifIP,
- long serializerSvcId);
-void pubsub_udpmcTopicReceiver_destroy(pubsub_updmc_topic_receiver_t *receiver);
+ long serializerSvcId,
+ pubsub_serializer_service_t *serializer);
+void pubsub_udpmcTopicReceiver_destroy(pubsub_udpmc_topic_receiver_t *receiver);
-const char* pubsub_udpmcTopicReceiver_psaType(pubsub_updmc_topic_receiver_t *receiver);
-const char* pubsub_udpmcTopicReceiver_serializerType(pubsub_updmc_topic_receiver_t *receiver);
-const char* pubsub_udpmcTopicReceiver_scope(pubsub_updmc_topic_receiver_t *receiver);
-const char* pubsub_udpmcTopicReceiver_topic(pubsub_updmc_topic_receiver_t *receiver);
-const char* pubsub_udpmcTopicReceiver_socketAddress(pubsub_updmc_topic_receiver_t *receiver);
+const char* pubsub_udpmcTopicReceiver_scope(pubsub_udpmc_topic_receiver_t *receiver);
+const char* pubsub_udpmcTopicReceiver_topic(pubsub_udpmc_topic_receiver_t *receiver);
+const char* pubsub_udpmcTopicReceiver_socketAddress(pubsub_udpmc_topic_receiver_t *receiver);
+
+long pubsub_udpmcTopicReceiver_serializerSvcId(pubsub_udpmc_topic_receiver_t *receiver);
void pubsub_udpmcTopicReceiver_connectTo(
- pubsub_updmc_topic_receiver_t *receiver,
+ pubsub_udpmc_topic_receiver_t *receiver,
const char *socketAddress,
long socketPort);
-void pubsub_udpmcTopicReceiver_disconnectFrom(pubsub_updmc_topic_receiver_t *receiver, const char *socketAddress, long socketPort);
+void pubsub_udpmcTopicReceiver_disconnectFrom(pubsub_udpmc_topic_receiver_t *receiver, const char *socketAddress, long socketPort);
#endif //CELIX_PUBSUB_UDPMC_TOPIC_RECEIVER_H
http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
index 4a6d027..3b13935 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
@@ -38,8 +38,10 @@
#define UDP_MAX_PORT 65000
-struct pubsub_updmc_topic_sender {
+struct pubsub_udpmc_topic_sender {
celix_bundle_context_t *ctx;
+ long serializerSvcId;
+ pubsub_serializer_service_t *serializer;
char *scope;
char *topic;
char *socketAddress;
@@ -48,13 +50,6 @@ struct pubsub_updmc_topic_sender {
int sendSocket;
struct sockaddr_in destAddr;
- long serTrackerId;
- struct {
- celix_thread_mutex_t mutex;
- pubsub_serializer_service_t *svc;
- const celix_properties_t *props;
- } serializer;
-
struct {
long svcId;
celix_service_factory_t factory;
@@ -67,7 +62,7 @@ struct pubsub_updmc_topic_sender {
};
typedef struct psa_udpmc_bounded_service_entry {
- pubsub_updmc_topic_sender_t *parent;
+ pubsub_udpmc_topic_sender_t *parent;
pubsub_publisher_t service;
long bndId;
hash_map_t *msgTypes;
@@ -81,26 +76,27 @@ typedef struct pubsub_msg{
unsigned int payloadSize;
} pubsub_msg_t;
-static void pubsub_udpmcTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props);
static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
static void psa_udpmc_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg);
static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback);
static unsigned int rand_range(unsigned int min, unsigned int max);
-pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create(
+pubsub_udpmc_topic_sender_t* pubsub_udpmcTopicSender_create(
celix_bundle_context_t *ctx,
const char *scope,
const char *topic,
long serializerSvcId,
+ pubsub_serializer_service_t *serializer,
int sendSocket,
const char *bindIP) {
- pubsub_updmc_topic_sender_t *sender = calloc(1, sizeof(*sender));
+ pubsub_udpmc_topic_sender_t *sender = calloc(1, sizeof(*sender));
sender->ctx = ctx;
+ sender->serializerSvcId = serializerSvcId;
+ sender->serializer = serializer;
sender->scope = strndup(scope, 1024 * 1024);
sender->topic = strndup(topic, 1024 * 1024);
- celixThreadMutex_create(&sender->serializer.mutex, NULL);
celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
@@ -116,20 +112,6 @@ pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create(
sender->socketPort = port;
}
- //track serializer svc based on the provided serializerSvcId
- {
- char filter[64];
- snprintf(filter, 64, "(service.id=%li)", serializerSvcId);
-
- celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
- opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
- opts.filter.filter = filter;
- opts.filter.ignoreServiceLanguage = true;
- opts.callbackHandle = sender;
- opts.setWithProperties = pubsub_udpmcTopicSender_setSerializer;
- sender->serTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- }
-
//register publisher services using a service factory
{
sender->publisher.factory.handle = sender;
@@ -147,12 +129,10 @@ pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create(
return sender;
}
-void pubsub_udpmcTopicSender_destroy(pubsub_updmc_topic_sender_t *sender) {
+void pubsub_udpmcTopicSender_destroy(pubsub_udpmc_topic_sender_t *sender) {
if (sender != NULL) {
- celix_bundleContext_stopTracker(sender->ctx, sender->serTrackerId);
celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId);
- celixThreadMutex_destroy(&sender->serializer.mutex);
celixThreadMutex_destroy(&sender->boundedServices.mutex);
//TODO loop and cleanup?
@@ -165,62 +145,41 @@ void pubsub_udpmcTopicSender_destroy(pubsub_updmc_topic_sender_t *sender) {
}
}
-const char* pubsub_udpmcTopicSender_psaType(pubsub_updmc_topic_sender_t *sender __attribute__((unused))) {
+const char* pubsub_udpmcTopicSender_psaType(pubsub_udpmc_topic_sender_t *sender __attribute__((unused))) {
return PSA_UDPMC_PUBSUB_ADMIN_TYPE;
}
-const char* pubsub_udpmcTopicSender_serializerType(pubsub_updmc_topic_sender_t *sender) {
- const char *result = NULL;
- celixThreadMutex_lock(&sender->serializer.mutex);
- if (sender->serializer.props != NULL) {
- result = celix_properties_get(sender->serializer.props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
- }
- celixThreadMutex_unlock(&sender->serializer.mutex);
- return result;
-}
-const char* pubsub_udpmcTopicSender_scope(pubsub_updmc_topic_sender_t *sender) {
+const char* pubsub_udpmcTopicSender_scope(pubsub_udpmc_topic_sender_t *sender) {
return sender->scope;
}
-const char* pubsub_udpmcTopicSender_topic(pubsub_updmc_topic_sender_t *sender) {
+const char* pubsub_udpmcTopicSender_topic(pubsub_udpmc_topic_sender_t *sender) {
return sender->topic;
}
-const char* pubsub_udpmcTopicSender_socketAddress(pubsub_updmc_topic_sender_t *sender) {
+const char* pubsub_udpmcTopicSender_socketAddress(pubsub_udpmc_topic_sender_t *sender) {
return sender->socketAddress;
}
-long pubsub_udpmcTopicSender_socketPort(pubsub_updmc_topic_sender_t *sender) {
+long pubsub_udpmcTopicSender_socketPort(pubsub_udpmc_topic_sender_t *sender) {
return sender->socketPort;
}
-void pubsub_udpmcTopicSender_connectTo(pubsub_updmc_topic_sender_t *sender, const celix_properties_t *endpoint) {
+void pubsub_udpmcTopicSender_connectTo(pubsub_udpmc_topic_sender_t *sender, const celix_properties_t *endpoint) {
//TODO subscriber count -> topic info
}
-void pubsub_udpmcTopicSender_disconnectFrom(pubsub_updmc_topic_sender_t *sender, const celix_properties_t *endpoint) {
+void pubsub_udpmcTopicSender_disconnectFrom(pubsub_udpmc_topic_sender_t *sender, const celix_properties_t *endpoint) {
//TODO
}
-static void pubsub_udpmcTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props) {
- pubsub_updmc_topic_sender_t *sender = handle;
- pubsub_serializer_service_t *ser = svc;
-
- if (ser == NULL) {
- //TODO -> no serializer -> remove all publishers
- }
-
- celixThreadMutex_lock(&sender->serializer.mutex);
- sender->serializer.svc = ser;
- sender->serializer.props = props;
- celixThreadMutex_unlock(&sender->serializer.mutex);
-}
-
static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) {
- pubsub_updmc_topic_sender_t *sender = handle;
+ pubsub_udpmc_topic_sender_t *sender = handle;
long bndId = celix_bundle_getId(requestingBundle);
+ pubsub_publisher_t *svc = NULL;
+
celixThreadMutex_lock(&sender->boundedServices.mutex);
psa_udpmc_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void*)bndId);
if (entry != NULL) {
@@ -232,32 +191,26 @@ static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *r
entry->bndId = bndId;
entry->largeUdpHandle = largeUdp_create(1);
- celixThreadMutex_lock(&sender->serializer.mutex);
- celix_status_t rc = CELIX_SUCCESS;
- if (sender->serializer.svc != NULL) {
- rc = sender->serializer.svc->createSerializerMap(sender->serializer.svc->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
- }
- if (sender->serializer.svc == NULL || rc != CELIX_SUCCESS) {
- //TODO destroy and return NULL?
+ int rc = sender->serializer->createSerializerMap(sender->serializer->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
+ if (rc == 0) {
+ entry->service.handle = entry;
+ entry->service.localMsgTypeIdForMsgType = psa_udpmc_localMsgTypeIdForMsgType;
+ entry->service.send = psa_udpmc_topicPublicationSend;
+ entry->service.sendMultipart = NULL; //note multipart not supported by MCUDP
+ hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
+ svc = &entry->service;
+ } else {
fprintf(stderr, "Error creating publisher service, serializer not available / cannot get msg serializer map\n");
+ free(entry);
}
- celixThreadMutex_unlock(&sender->serializer.mutex);
-
-
- entry->service.handle = entry;
- entry->service.localMsgTypeIdForMsgType = psa_udpmc_localMsgTypeIdForMsgType;
- entry->service.send = psa_udpmc_topicPublicationSend;
- entry->service.sendMultipart = NULL; //note multipart not supported by MCUDP
-
- hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
}
celixThreadMutex_unlock(&sender->boundedServices.mutex);
- return &entry->service;
+ return svc;
}
static void psa_udpmc_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) {
- pubsub_updmc_topic_sender_t *sender = handle;
+ pubsub_udpmc_topic_sender_t *sender = handle;
long bndId = celix_bundle_getId(requestingBundle);
celixThreadMutex_lock(&sender->boundedServices.mutex);
@@ -269,17 +222,10 @@ static void psa_udpmc_ungetPublisherService(void *handle, const celix_bundle_t *
//free entry
hashMap_remove(sender->boundedServices.map, (void*)bndId);
-
- celixThreadMutex_lock(&sender->serializer.mutex);
- celix_status_t rc = CELIX_SUCCESS;
- if (sender->serializer.svc != NULL) {
- rc = sender->serializer.svc->destroySerializerMap(sender->serializer.svc->handle, entry->msgTypes);
- }
- if (sender->serializer.svc == NULL || rc != CELIX_SUCCESS) {
+ int rc = sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes);
+ if (rc != 0) {
fprintf(stderr, "Error destroying publisher service, serializer not available / cannot get msg serializer map\n");
}
- celixThreadMutex_unlock(&sender->serializer.mutex);
-
largeUdp_destroy(entry->largeUdpHandle);
free(entry);
http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
index 89e56ec..07c1301 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
@@ -20,28 +20,30 @@
#define CELIX_PUBSUB_UDPMC_TOPIC_SENDER_H
#include "celix_bundle_context.h"
+#include "pubsub_serializer.h"
-typedef struct pubsub_updmc_topic_sender pubsub_updmc_topic_sender_t;
+typedef struct pubsub_udpmc_topic_sender pubsub_udpmc_topic_sender_t;
-pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create(
+pubsub_udpmc_topic_sender_t* pubsub_udpmcTopicSender_create(
celix_bundle_context_t *ctx,
const char *scope,
const char *topic,
long serializerSvcId,
+ pubsub_serializer_service_t *serializer,
int sendSocket,
const char *bindIP);
-void pubsub_udpmcTopicSender_destroy(pubsub_updmc_topic_sender_t *sender);
+void pubsub_udpmcTopicSender_destroy(pubsub_udpmc_topic_sender_t *sender);
-const char* pubsub_udpmcTopicSender_psaType(pubsub_updmc_topic_sender_t *sender);
-const char* pubsub_udpmcTopicSender_serializerType(pubsub_updmc_topic_sender_t *sender);
-const char* pubsub_udpmcTopicSender_scope(pubsub_updmc_topic_sender_t *sender);
-const char* pubsub_udpmcTopicSender_topic(pubsub_updmc_topic_sender_t *sender);
-const char* pubsub_udpmcTopicSender_socketAddress(pubsub_updmc_topic_sender_t *sender);
-long pubsub_udpmcTopicSender_socketPort(pubsub_updmc_topic_sender_t *sender);
+const char* pubsub_udpmcTopicSender_scope(pubsub_udpmc_topic_sender_t *sender);
+const char* pubsub_udpmcTopicSender_topic(pubsub_udpmc_topic_sender_t *sender);
+const char* pubsub_udpmcTopicSender_socketAddress(pubsub_udpmc_topic_sender_t *sender);
+long pubsub_udpmcTopicSender_socketPort(pubsub_udpmc_topic_sender_t *sender);
+
+long pubsub_udpmcTopicSender_serializerSvcId(pubsub_udpmc_topic_sender_t *sender);
//TODO connections etc
-void pubsub_udpmcTopicSender_connectTo(pubsub_updmc_topic_sender_t *sender, const celix_properties_t *endpoint);
-void pubsub_udpmcTopicSender_disconnectFrom(pubsub_updmc_topic_sender_t *sender, const celix_properties_t *endpoint);
+void pubsub_udpmcTopicSender_connectTo(pubsub_udpmc_topic_sender_t *sender, const celix_properties_t *endpoint);
+void pubsub_udpmcTopicSender_disconnectFrom(pubsub_udpmc_topic_sender_t *sender, const celix_properties_t *endpoint);
#endif //CELIX_PUBSUB_UDPMC_TOPIC_SENDER_H
http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
index b7cc138..353c33e 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
@@ -33,6 +33,8 @@ typedef struct psa_zmq_activator {
pubsub_zmq_admin_t *admin;
+ long serializersTrackerId;
+
pubsub_admin_service_t adminService;
long adminSvcId;
@@ -43,6 +45,7 @@ typedef struct psa_zmq_activator {
int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
act->adminSvcId = -1L;
act->cmdSvcId = -1L;
+ act->serializersTrackerId = -1L;
logHelper_create(ctx, &act->logHelper);
logHelper_start(act->logHelper);
@@ -50,6 +53,17 @@ int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
act->admin = pubsub_zmqAdmin_create(ctx, act->logHelper);
celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
+ //track serializers
+ if (status == CELIX_SUCCESS) {
+ celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+ opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
+ opts.filter.ignoreServiceLanguage = true;
+ opts.callbackHandle = act->admin;
+ opts.addWithProperties = pubsub_zmqAdmin_addSerializerSvc;
+ opts.removeWithProperties = pubsub_zmqAdmin_removeSerializerSvc;
+ act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+ }
+
//register pubsub admin service
if (status == CELIX_SUCCESS) {
pubsub_admin_service_t *psaSvc = &act->adminService;
@@ -87,6 +101,7 @@ int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
int psa_zmq_stop(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
celix_bundleContext_unregisterService(ctx, act->adminSvcId);
celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
+ celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
pubsub_zmqAdmin_destroy(act->admin);
logHelper_stop(act->logHelper);
http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index d3ba3b5..5b6c384 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@ -25,6 +25,7 @@
#include <ifaddrs.h>
#include <pubsub_endpoint.h>
#include <czmq.h>
+#include <pubsub_serializer.h>
#include "pubsub_utils.h"
#include "pubsub_zmq_admin.h"
@@ -60,24 +61,35 @@ struct pubsub_zmq_admin {
struct {
celix_thread_mutex_t mutex;
- hash_map_t *map; //key = scope:topic key, value = pubsub_zmq_topic_sender_t
+ hash_map_t *map; //key = svcId, value = psa_zmq_serializer_entry_t*
+ } serializers;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = scope:topic key, value = pubsub_zmq_topic_sender_t*
} topicSenders;
struct {
celix_thread_mutex_t mutex;
- hash_map_t *map; //key = scope:topic key, value = pubsub_zmq_topic_sender_t
+ hash_map_t *map; //key = scope:topic key, value = pubsub_zmq_topic_sender_t*
} topicReceivers;
struct {
celix_thread_mutex_t mutex;
- hash_map_t *map; //key = endpoint uuid, value = psa_zmq_connected_endpoint_entry_t
+ hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
} discoveredEndpoints;
};
+typedef struct psa_zmq_serializer_entry {
+ const char *serType;
+ long svcId;
+ pubsub_serializer_service_t *svc;
+} psa_zmq_serializer_entry_t;
+
static celix_status_t zmq_getIpAddress(const char* interface, const char** ip);
-static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint);
-static celix_status_t pubsub_zmqAdmin_disconnectEndpointFromReceiver(pubsub_zmq_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint);
+static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin_t* psa, pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *endpoint);
+static celix_status_t pubsub_zmqAdmin_disconnectEndpointFromReceiver(pubsub_zmq_admin_t* psa, pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *endpoint);
pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper) {
@@ -148,6 +160,9 @@ pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, log_help
psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_ZMQ_QOS_SAMPLE_SCORE_KEY, PSA_ZMQ_DEFAULT_QOS_SAMPLE_SCORE);
psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_ZMQ_QOS_CONTROL_SCORE_KEY, PSA_ZMQ_DEFAULT_QOS_CONTROL_SCORE);
+ celixThreadMutex_create(&psa->serializers.mutex, NULL);
+ psa->serializers.map = hashMap_create(NULL, NULL, NULL, NULL);
+
celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
@@ -178,7 +193,7 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
celixThreadMutex_lock(&psa->topicReceivers.mutex);
iter = hashMapIterator_construct(psa->topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
- pubsub_updmc_topic_receiver_t *recv = hashMapIterator_nextValue(&iter);
+ pubsub_zmq_topic_receiver_t *recv = hashMapIterator_nextValue(&iter);
pubsub_zmqTopicReceiver_destroy(recv);
}
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
@@ -191,6 +206,14 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
}
celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+ celixThreadMutex_lock(&psa->serializers.mutex);
+ iter = hashMapIterator_construct(psa->serializers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_zmq_serializer_entry_t *entry = hashMapIterator_nextValue(&iter);
+ free(entry);
+ }
+ celixThreadMutex_unlock(&psa->serializers.mutex);
+
celixThreadMutex_destroy(&psa->topicSenders.mutex);
hashMap_destroy(psa->topicSenders.map, true, false);
@@ -200,6 +223,9 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
hashMap_destroy(psa->discoveredEndpoints.map, true, false);
+ celixThreadMutex_destroy(&psa->serializers.mutex);
+ hashMap_destroy(psa->serializers.map, false, false);
+
if (psa->zmq_auth != NULL){
zactor_destroy(&psa->zmq_auth);
}
@@ -209,6 +235,73 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
free(psa);
}
+void pubsub_zmqAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
+ pubsub_zmq_admin_t *psa = handle;
+
+ const char *serType = celix_properties_get(props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+
+ if (serType == NULL) {
+ L_INFO("[PSA_ZMQ] Ignoring serializer service without %s property", PUBSUB_SERIALIZER_TYPE_KEY);
+ return;
+ }
+
+ celixThreadMutex_lock(&psa->serializers.mutex);
+ psa_zmq_serializer_entry_t *entry = hashMap_get(psa->serializers.map, (void*)svcId);
+ if (entry == NULL) {
+ entry = calloc(1, sizeof(*entry));
+ entry->serType = serType;
+ entry->svcId = svcId;
+ entry->svc = svc;
+ hashMap_put(psa->serializers.map, (void*)svcId, entry);
+ }
+ celixThreadMutex_unlock(&psa->serializers.mutex);
+}
+
+void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
+ pubsub_zmq_admin_t *psa = handle;
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+
+ //remove serializer
+ // 1) First find entry and
+ // 2) loop and destroy all topic sender using the serializer and
+ // 3) loop and destroy all topic receivers using the serializer
+ // Note that it is the responsibility of the topology manager to create new topic senders/receivers
+
+ celixThreadMutex_lock(&psa->serializers.mutex);
+ psa_zmq_serializer_entry_t *entry = hashMap_remove(psa->serializers.map, (void*)svcId);
+ if (entry != NULL) {
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+ pubsub_zmq_topic_sender_t *sender = hashMapEntry_getValue(senderEntry);
+ if (sender != NULL && entry->svcId == pubsub_zmqTopicSender_serializerSvcId(sender)) {
+ char *key = hashMapEntry_getKey(senderEntry);
+ hashMapIterator_remove(&iter);
+ pubsub_zmqTopicSender_destroy(sender);
+ free(key);
+ }
+ }
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ iter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+ pubsub_zmq_topic_receiver_t *receiver = hashMapEntry_getValue(senderEntry);
+ if (receiver != NULL && entry->svcId == pubsub_zmqTopicReceiver_serializerSvcId(receiver)) {
+ char *key = hashMapEntry_getKey(senderEntry);
+ hashMapIterator_remove(&iter);
+ pubsub_zmqTopicReceiver_destroy(receiver);
+ free(key);
+ }
+ }
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+ }
+ celixThreadMutex_unlock(&psa->serializers.mutex);
+}
+
celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *outScore, long *outSerializerSvcId) {
pubsub_zmq_admin_t *psa = handle;
L_DEBUG("[PSA_ZMQ] pubsub_zmqAdmin_matchPublisher");
@@ -255,13 +348,19 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
celix_properties_t *newEndpoint = NULL;
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+
+ celixThreadMutex_lock(&psa->serializers.mutex);
celixThreadMutex_lock(&psa->topicSenders.mutex);
pubsub_zmq_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key);
if (sender == NULL) {
- sender = pubsub_zmqTopicSender_create(psa->ctx, psa->log, scope, topic, serializerSvcId, psa->ipAddress, psa->basePort, psa->maxPort);
+ psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
+ if (serEntry != NULL) {
+ sender = pubsub_zmqTopicSender_create(psa->ctx, psa->log, scope, topic, serializerSvcId, serEntry->svc, psa->ipAddress,
+ psa->basePort, psa->maxPort);
+ }
if (sender != NULL) {
- const char *psaType = pubsub_zmqTopicSender_psaType(sender);
- const char *serType = pubsub_zmqTopicSender_serializerType(sender);
+ const char *psaType = PUBSUB_ZMQ_ADMIN_TYPE;
+ const char *serType = serEntry->serType;
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
serType, NULL);
celix_properties_set(newEndpoint, PUBSUB_ZMQ_URL_KEY, pubsub_zmqTopicSender_url(sender));
@@ -270,8 +369,6 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
if (cn != NULL) {
celix_properties_set(newEndpoint, "container_name", cn);
}
-
- //TODO connect endpoints to sender, NOTE is this needed for a zmq topic sender?
hashMap_put(psa->topicSenders.map, key, sender);
} else {
L_ERROR("[PSA ZMQ] Error creating a TopicSender");
@@ -282,7 +379,11 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
L_ERROR("[PSA_ZMQ] Cannot setup already existing TopicSender for scope/topic %s/%s!", scope, topic);
}
celixThreadMutex_unlock(&psa->topicSenders.mutex);
+ celixThreadMutex_unlock(&psa->serializers.mutex);
+ if (sender != NULL && newEndpoint != NULL) {
+ //TODO connect endpoints to sender, NOTE is this needed for a zmq topic sender?
+ }
if (newEndpoint != NULL && outPublisherEndpoint != NULL) {
*outPublisherEndpoint = newEndpoint;
@@ -322,30 +423,26 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop
celix_properties_t *newEndpoint = NULL;
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+ celixThreadMutex_lock(&psa->serializers.mutex);
celixThreadMutex_lock(&psa->topicReceivers.mutex);
- pubsub_updmc_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
+ pubsub_zmq_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
if (receiver == NULL) {
- receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, serializerSvcId);
+ psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
+ if (serEntry != NULL) {
+ receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, serializerSvcId, serEntry->svc);
+ } else {
+ L_ERROR("[PSA_ZMQ] Cannot find serializer for TopicSender %s/%s", scope, topic);
+ }
if (receiver != NULL) {
- const char *psaType = pubsub_zmqTopicReceiver_psaType(receiver);
- const char *serType = pubsub_zmqTopicReceiver_serializerType(receiver);
+ const char *psaType = PUBSUB_ZMQ_ADMIN_TYPE;
+ const char *serType = serEntry->serType;
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL);
-
//if available also set container name
const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
if (cn != NULL) {
celix_properties_set(newEndpoint, "container_name", cn);
}
-
- celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
- while (hashMapIterator_hasNext(&iter)) {
- celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
- pubsub_zmqAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
- }
- celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
-
hashMap_put(psa->topicReceivers.map, key, receiver);
} else {
L_ERROR("[PSA ZMQ] Error creating a TopicReceiver.");
@@ -356,7 +453,17 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop
L_ERROR("[PSA_ZMQ] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic);
}
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
-
+ celixThreadMutex_unlock(&psa->serializers.mutex);
+
+ if (receiver != NULL && newEndpoint != NULL) {
+ celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
+ pubsub_zmqAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+ }
+ celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+ }
if (newEndpoint != NULL && outSubscriberEndpoint != NULL) {
*outSubscriberEndpoint = newEndpoint;
@@ -375,7 +482,7 @@ celix_status_t pubsub_zmqAdmin_teardownTopicReceiver(void *handle, const char *s
free(key);
if (entry != NULL) {
char *receiverKey = hashMapEntry_getKey(entry);
- pubsub_updmc_topic_receiver_t *receiver = hashMapEntry_getValue(entry);
+ pubsub_zmq_topic_receiver_t *receiver = hashMapEntry_getValue(entry);
hashMap_remove(psa->topicReceivers.map, receiverKey);
free(receiverKey);
@@ -387,7 +494,7 @@ celix_status_t pubsub_zmqAdmin_teardownTopicReceiver(void *handle, const char *s
return status;
}
-static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
+static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin_t* psa, pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
//note can be called with discoveredEndpoint.mutex lock
celix_status_t status = CELIX_SUCCESS;
@@ -419,7 +526,7 @@ celix_status_t pubsub_zmqAdmin_addEndpoint(void *handle, const celix_properties_
celixThreadMutex_lock(&psa->topicReceivers.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
- pubsub_updmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+ pubsub_zmq_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
pubsub_zmqAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
}
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
@@ -435,7 +542,7 @@ celix_status_t pubsub_zmqAdmin_addEndpoint(void *handle, const celix_properties_
}
-static celix_status_t pubsub_zmqAdmin_disconnectEndpointFromReceiver(pubsub_zmq_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
+static celix_status_t pubsub_zmqAdmin_disconnectEndpointFromReceiver(pubsub_zmq_admin_t* psa, pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
//note can be called with discoveredEndpoint.mutex lock
celix_status_t status = CELIX_SUCCESS;
@@ -467,7 +574,7 @@ celix_status_t pubsub_zmqAdmin_removeEndpoint(void *handle, const celix_properti
celixThreadMutex_lock(&psa->topicReceivers.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
- pubsub_updmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+ pubsub_zmq_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
pubsub_zmqAdmin_disconnectEndpointFromReceiver(psa, receiver, endpoint);
}
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
@@ -491,37 +598,41 @@ celix_status_t pubsub_zmqAdmin_executeCommand(void *handle, char *commandLine __
fprintf(out, "\n");
fprintf(out, "Topic Senders:\n");
+ celixThreadMutex_lock(&psa->serializers.mutex);
celixThreadMutex_lock(&psa->topicSenders.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
while (hashMapIterator_hasNext(&iter)) {
pubsub_zmq_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
- const char *psaType = pubsub_zmqTopicSender_psaType(sender);
- const char *serType = pubsub_zmqTopicSender_serializerType(sender);
+ long serSvcId = pubsub_zmqTopicSender_serializerSvcId(sender);
+ psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serSvcId);
+ const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
const char *scope = pubsub_zmqTopicSender_scope(sender);
const char *topic = pubsub_zmqTopicSender_topic(sender);
const char *url = pubsub_zmqTopicSender_url(sender);
fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
- fprintf(out, " |- psa type = %s\n", psaType);
fprintf(out, " |- serializer type = %s\n", serType);
fprintf(out, " |- url = %s\n", url);
}
celixThreadMutex_unlock(&psa->topicSenders.mutex);
+ celixThreadMutex_unlock(&psa->serializers.mutex);
fprintf(out, "\n");
fprintf(out, "\nTopic Receivers:\n");
+ celixThreadMutex_lock(&psa->serializers.mutex);
celixThreadMutex_lock(&psa->topicReceivers.mutex);
iter = hashMapIterator_construct(psa->topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
- pubsub_updmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
- const char *psaType = pubsub_zmqTopicReceiver_psaType(receiver);
- const char *serType = pubsub_zmqTopicReceiver_serializerType(receiver);
+ pubsub_zmq_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+ long serSvcId = pubsub_zmqTopicReceiver_serializerSvcId(receiver);
+ psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serSvcId);
+ const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
const char *scope = pubsub_zmqTopicReceiver_scope(receiver);
const char *topic = pubsub_zmqTopicReceiver_topic(receiver);
fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
- fprintf(out, " |- psa type = %s\n", psaType);
fprintf(out, " |- serializer type = %s\n", serType);
}
- celixThreadMutex_unlock(&psa->topicSenders.mutex);
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+ celixThreadMutex_unlock(&psa->serializers.mutex);
fprintf(out, "\n");
//TODO topic receivers/senders connection count
http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
index a1c5a75..180a9db 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
@@ -53,6 +53,9 @@ celix_status_t pubsub_zmqAdmin_teardownTopicReceiver(void *handle, const char *s
celix_status_t pubsub_zmqAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint);
celix_status_t pubsub_zmqAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint);
+void pubsub_zmqAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
+void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
+
celix_status_t pubsub_zmqAdmin_executeCommand(void *handle, char *commandLine, FILE *outStream, FILE *errStream);
#endif //CELIX_PUBSUB_ZMQ_ADMIN_H