You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/10/12 09:03:34 UTC
[17/34] celix git commit: CELIX-454: Some bug and mem leak fixes for
the PSA refactoring
http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index e26754c..f0c072d 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -45,23 +45,16 @@
#define L_ERROR(...) \
logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
-struct pubsub_updmc_topic_receiver {
+struct pubsub_zmq_topic_receiver {
celix_bundle_context_t *ctx;
log_helper_t *logHelper;
+ long serializerSvcId;
+ pubsub_serializer_service_t *serializer;
char *scope;
char *topic;
zsock_t *zmqSocket;
- //serialiser svc
- long serializerTrackerId;
- struct {
-
- celix_thread_mutex_t mutex; //protect svc
- pubsub_serializer_service_t *svc;
- const celix_properties_t *props;
- } serializer;
-
struct {
celix_thread_t thread;
celix_thread_mutex_t mutex;
@@ -107,21 +100,23 @@ typedef struct msg_map_entry {
} msg_map_entry_t;
-static void pubsub_zmqTopicReceiver_setSerializer(void *handle, void *svc, const celix_properties_t *props);
static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
-static void psa_zmq_processMsg(pubsub_updmc_topic_receiver_t *receiver, celix_array_list_t *messages);
+static void psa_zmq_processMsg(pubsub_zmq_topic_receiver_t *receiver, celix_array_list_t *messages);
static void* psa_zmq_recvThread(void * data);
-pubsub_updmc_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context_t *ctx,
+pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context_t *ctx,
log_helper_t *logHelper,
const char *scope,
const char *topic,
- long serializerSvcId) {
- pubsub_updmc_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
+ long serializerSvcId,
+ pubsub_serializer_service_t *serializer) {
+ pubsub_zmq_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
receiver->ctx = ctx;
receiver->logHelper = logHelper;
+ receiver->serializerSvcId = serializerSvcId;
+ receiver->serializer = serializer;
#ifdef BUILD_WITH_ZMQ_SECURITY
@@ -186,7 +181,6 @@ pubsub_updmc_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_conte
receiver->scope = strndup(scope, 1024 * 1024);
receiver->topic = strndup(topic, 1024 * 1024);
- celixThreadMutex_create(&receiver->serializer.mutex, NULL);
celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
celixThreadMutex_create(&receiver->recvThread.mutex, NULL);
@@ -195,20 +189,6 @@ pubsub_updmc_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_conte
receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
}
- //track serializer svc based on the provided serializerSvcId
- if (receiver->zmqSocket != NULL ) {
- char filter[64];
- snprintf(filter, 64, "(service.id=%li)", serializerSvcId);
-
- celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
- opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
- opts.filter.filter = filter;
- opts.filter.ignoreServiceLanguage = true;
- opts.callbackHandle = receiver;
- opts.setWithProperties = pubsub_zmqTopicReceiver_setSerializer;
- receiver->serializerTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- }
-
//track subscribers
if (receiver->zmqSocket != NULL ) {
int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
@@ -239,39 +219,28 @@ pubsub_updmc_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_conte
return receiver;
}
-void pubsub_zmqTopicReceiver_destroy(pubsub_updmc_topic_receiver_t *receiver) {
+void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) {
if (receiver != NULL) {
+ celixThreadMutex_lock(&receiver->recvThread.mutex);
+ receiver->recvThread.running = false;
+ celixThreadMutex_unlock(&receiver->recvThread.mutex);
+ celixThread_join(receiver->recvThread.thread, NULL);
+
+ celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId);
+
celixThreadMutex_lock(&receiver->subscribers.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
while (hashMapIterator_hasNext(&iter)) {
psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
if (entry != NULL) {
- celixThreadMutex_lock(&receiver->serializer.mutex);
- if (receiver->serializer.svc != NULL) {
- if (entry->msgTypes != NULL) {
- receiver->serializer.svc->destroySerializerMap(receiver->serializer.svc->handle,
- entry->msgTypes);
- }
- } else {
- L_ERROR("Cannot find serializer for TopicReceiver %s/%s", receiver->scope, receiver->topic);
- }
- celixThreadMutex_unlock(&receiver->serializer.mutex);
+ receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
free(entry);
}
}
hashMap_destroy(receiver->subscribers.map, false, false);
celixThreadMutex_unlock(&receiver->subscribers.mutex);
-
- celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId);
- celix_bundleContext_stopTracker(receiver->ctx, receiver->serializerTrackerId);
-
- celixThreadMutex_lock(&receiver->recvThread.mutex);
- receiver->recvThread.running = false;
- celixThreadMutex_unlock(&receiver->recvThread.mutex);
- celixThread_join(receiver->recvThread.thread, NULL);
-
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
iter = hashMapIterator_construct(receiver->requestedConnections.map);
while (hashMapIterator_hasNext(&iter)) {
@@ -284,7 +253,6 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_updmc_topic_receiver_t *receiver) {
hashMap_destroy(receiver->requestedConnections.map, false, false);
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
- celixThreadMutex_destroy(&receiver->serializer.mutex);
celixThreadMutex_destroy(&receiver->subscribers.mutex);
celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
celixThreadMutex_destroy(&receiver->recvThread.mutex);
@@ -294,29 +262,20 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_updmc_topic_receiver_t *receiver) {
free(receiver);
}
-const char* pubsub_zmqTopicReceiver_psaType(pubsub_updmc_topic_receiver_t *receiver) {
- return PSA_ZMQ_PUBSUB_ADMIN_TYPE;
-}
-
-const char* pubsub_zmqTopicReceiver_serializerType(pubsub_updmc_topic_receiver_t *receiver) {
- const char *result = NULL;
- celixThreadMutex_lock(&receiver->serializer.mutex);
- if (receiver->serializer.props != NULL) {
- result = celix_properties_get(receiver->serializer.props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
- }
- celixThreadMutex_unlock(&receiver->serializer.mutex);
- return result;
-}
-
-const char* pubsub_zmqTopicReceiver_scope(pubsub_updmc_topic_receiver_t *receiver) {
+const char* pubsub_zmqTopicReceiver_scope(pubsub_zmq_topic_receiver_t *receiver) {
return receiver->scope;
}
-const char* pubsub_zmqTopicReceiver_topic(pubsub_updmc_topic_receiver_t *receiver) {
+const char* pubsub_zmqTopicReceiver_topic(pubsub_zmq_topic_receiver_t *receiver) {
return receiver->topic;
}
+long pubsub_zmqTopicReceiver_serializerSvcId(pubsub_zmq_topic_receiver_t *receiver) {
+ return receiver->serializerSvcId;
+}
+
+
void pubsub_zmqTopicReceiver_connectTo(
- pubsub_updmc_topic_receiver_t *receiver,
+ pubsub_zmq_topic_receiver_t *receiver,
const char *url) {
L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", receiver->scope, receiver->topic, url);
@@ -338,7 +297,7 @@ void pubsub_zmqTopicReceiver_connectTo(
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
}
-void pubsub_zmqTopicReceiver_disconnectFrom(pubsub_updmc_topic_receiver_t *receiver, const char *url) {
+void pubsub_zmqTopicReceiver_disconnectFrom(pubsub_zmq_topic_receiver_t *receiver, const char *url) {
L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", receiver->scope, receiver->topic, url);
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
@@ -357,39 +316,8 @@ void pubsub_zmqTopicReceiver_disconnectFrom(pubsub_updmc_topic_receiver_t *recei
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
}
-
-static void pubsub_zmqTopicReceiver_setSerializer(void *handle, void *svc, const celix_properties_t *props) {
- pubsub_updmc_topic_receiver_t *receiver = handle;
- pubsub_serializer_service_t *ser = svc;
-
- //check if current serializer is set, if so destroy existing type maps
- celixThreadMutex_lock(&receiver->subscribers.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry != NULL && entry->msgTypes != NULL) {
- celixThreadMutex_lock(&receiver->serializer.mutex);
- if (receiver->serializer.svc != NULL) {
- receiver->serializer.svc->destroySerializerMap(receiver->serializer.svc->handle, entry->msgTypes);
- }
- celixThreadMutex_unlock(&receiver->serializer.mutex);
- entry->msgTypes = NULL;
- }
- }
- celixThreadMutex_unlock(&receiver->subscribers.mutex);
-
- if (ser == NULL) {
- //TODO -> no serializer -> remove all publishers
- }
-
- celixThreadMutex_lock(&receiver->serializer.mutex);
- receiver->serializer.svc = ser;
- receiver->serializer.props = props;
- celixThreadMutex_unlock(&receiver->serializer.mutex);
-}
-
static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
- pubsub_updmc_topic_receiver_t *receiver = handle;
+ pubsub_zmq_topic_receiver_t *receiver = handle;
long bndId = celix_bundle_getId(bnd);
const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
@@ -408,22 +336,19 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
entry->usageCount = 1;
entry->svc = svc;
- celixThreadMutex_lock(&receiver->serializer.mutex);
- if (receiver->serializer.svc != NULL) {
- receiver->serializer.svc->createSerializerMap(receiver->serializer.svc->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
+ int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
+ if (rc == 0) {
+ hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
} else {
- fprintf(stderr, "Cannot find serializer for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+ L_ERROR("[PSA_ZMQ] Cannot create msg serializer map for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+ free(entry);
}
- celixThreadMutex_unlock(&receiver->serializer.mutex);
-
- hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
-
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
- pubsub_updmc_topic_receiver_t *receiver = handle;
+ pubsub_zmq_topic_receiver_t *receiver = handle;
long bndId = celix_bundle_getId(bnd);
@@ -435,20 +360,17 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
if (entry != NULL && entry->usageCount <= 0) {
//remove entry
hashMap_remove(receiver->subscribers.map, (void*)bndId);
- celixThreadMutex_lock(&receiver->serializer.mutex);
- if (receiver->serializer.svc != NULL) {
- receiver->serializer.svc->destroySerializerMap(receiver->serializer.svc->handle, entry->msgTypes);
- } else {
- fprintf(stderr, "Cannot find serializer for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+ int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+ if (rc != 0) {
+ L_ERROR("[PSA_ZMQ] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->scope, receiver->topic);
}
- celixThreadMutex_unlock(&receiver->serializer.mutex);
free(entry);
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
static void* psa_zmq_recvThread(void * data) {
- pubsub_updmc_topic_receiver_t *receiver = data;
+ pubsub_zmq_topic_receiver_t *receiver = data;
celixThreadMutex_lock(&receiver->recvThread.mutex);
bool running = receiver->recvThread.running;
@@ -629,7 +551,7 @@ static mp_handle_t* psa_zmq_createMultipartHandle(hash_map_pt svc_msg_db,array_l
}
-static void psa_zmq_processMsg(pubsub_updmc_topic_receiver_t *receiver, celix_array_list_t *messages) {
+static void psa_zmq_processMsg(pubsub_zmq_topic_receiver_t *receiver, celix_array_list_t *messages) {
pubsub_msg_header_t *first_msg_hdr = (pubsub_msg_header_t*)zframe_data(((complete_zmq_msg_t*)celix_arrayList_get(messages,0))->header);
http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
index 269d1db..ea931a1 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
@@ -21,21 +21,22 @@
#include "celix_bundle_context.h"
-typedef struct pubsub_updmc_topic_receiver pubsub_updmc_topic_receiver_t;
+typedef struct pubsub_zmq_topic_receiver pubsub_zmq_topic_receiver_t;
-pubsub_updmc_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context_t *ctx,
+pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context_t *ctx,
log_helper_t *logHelper,
const char *scope,
const char *topic,
- long serializerSvcId);
-void pubsub_zmqTopicReceiver_destroy(pubsub_updmc_topic_receiver_t *receiver);
+ long serializerSvcId,
+ pubsub_serializer_service_t *serializer);
+void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver);
-const char* pubsub_zmqTopicReceiver_psaType(pubsub_updmc_topic_receiver_t *receiver);
-const char* pubsub_zmqTopicReceiver_serializerType(pubsub_updmc_topic_receiver_t *receiver);
-const char* pubsub_zmqTopicReceiver_scope(pubsub_updmc_topic_receiver_t *receiver);
-const char* pubsub_zmqTopicReceiver_topic(pubsub_updmc_topic_receiver_t *receiver);
+const char* pubsub_zmqTopicReceiver_scope(pubsub_zmq_topic_receiver_t *receiver);
+const char* pubsub_zmqTopicReceiver_topic(pubsub_zmq_topic_receiver_t *receiver);
-void pubsub_zmqTopicReceiver_connectTo(pubsub_updmc_topic_receiver_t *receiver, const char *url);
-void pubsub_zmqTopicReceiver_disconnectFrom(pubsub_updmc_topic_receiver_t *receiver, const char *url);
+long pubsub_zmqTopicReceiver_serializerSvcId(pubsub_zmq_topic_receiver_t *receiver);
+
+void pubsub_zmqTopicReceiver_connectTo(pubsub_zmq_topic_receiver_t *receiver, const char *url);
+void pubsub_zmqTopicReceiver_disconnectFrom(pubsub_zmq_topic_receiver_t *receiver, const char *url);
#endif //CELIX_PUBSUB_ZMQ_TOPIC_RECEIVER_H
http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index 9477055..9d44c94 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -47,6 +47,8 @@
struct pubsub_zmq_topic_sender {
celix_bundle_context_t *ctx;
log_helper_t *logHelper;
+ long serializerSvcId;
+ pubsub_serializer_service_t *serializer;
char *scope;
char *topic;
@@ -58,13 +60,6 @@ struct pubsub_zmq_topic_sender {
zcert_t *cert;
} zmq;
- long serTrackerId;
- struct {
- celix_thread_mutex_t mutex;
- pubsub_serializer_service_t *svc;
- const celix_properties_t *props;
- } serializer;
-
struct {
long svcId;
celix_service_factory_t factory;
@@ -97,7 +92,6 @@ typedef struct pubsub_msg {
int payloadSize;
} pubsub_msg_t;
-static void pubsub_zmqTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props);
static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
static void psa_zmq_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
static unsigned int rand_range(unsigned int min, unsigned int max);
@@ -112,12 +106,15 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
const char *scope,
const char *topic,
long serializerSvcId,
+ pubsub_serializer_service_t *ser,
const char *bindIP,
unsigned int basePort,
unsigned int maxPort) {
pubsub_zmq_topic_sender_t *sender = calloc(1, sizeof(*sender));
sender->ctx = ctx;
sender->logHelper = logHelper;
+ sender->serializerSvcId = serializerSvcId;
+ sender->serializer = ser;
//setting up zmq socket for ZMQ TopicSender
{
@@ -217,26 +214,11 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
sender->scope = strndup(scope, 1024 * 1024);
sender->topic = strndup(topic, 1024 * 1024);
- celixThreadMutex_create(&sender->serializer.mutex, NULL);
celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
celixThreadMutex_create(&sender->zmq.mutex, NULL);
sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
}
- //track serializer svc based on the provided serializerSvcId
- if (sender->url != NULL ) {
- char filter[64];
- snprintf(filter, 64, "(service.id=%li)", serializerSvcId);
-
- celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
- opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
- opts.filter.filter = filter;
- opts.filter.ignoreServiceLanguage = true;
- opts.callbackHandle = sender;
- opts.setWithProperties = pubsub_zmqTopicSender_setSerializer;
- sender->serTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- }
-
//register publisher services using a service factory
if (sender->url != NULL) {
sender->publisher.factory.handle = sender;
@@ -261,16 +243,30 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
if (sender != NULL) {
- celix_bundleContext_stopTracker(sender->ctx, sender->serTrackerId);
celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId);
- celixThreadMutex_destroy(&sender->serializer.mutex);
+ celixThreadMutex_lock(&sender->boundedServices.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_zmq_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (entry != NULL) {
+ sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes);
+ celixThreadMutex_destroy(&entry->multipart.mutex);
+ for (int i = 0; i < celix_arrayList_size(entry->multipart.parts); ++i) {
+ pubsub_msg_t *msg = celix_arrayList_get(entry->multipart.parts, i);
+ free(msg->header);
+ free(msg->payload);
+ free(msg);
+ }
+ free(entry);
+ }
+ }
+ hashMap_destroy(sender->boundedServices.map, false, false);
+ celixThreadMutex_unlock(&sender->boundedServices.mutex);
+
celixThreadMutex_destroy(&sender->boundedServices.mutex);
celixThreadMutex_destroy(&sender->zmq.mutex);
- //TODO loop and cleanup
- hashMap_destroy(sender->boundedServices.map, false, false);
-
free(sender->scope);
free(sender->topic);
free(sender->url);
@@ -278,18 +274,8 @@ void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
}
}
-const char* pubsub_zmqTopicSender_psaType(pubsub_zmq_topic_sender_t *sender __attribute__((unused))) {
- return PSA_ZMQ_PUBSUB_ADMIN_TYPE;
-}
-
-const char* pubsub_zmqTopicSender_serializerType(pubsub_zmq_topic_sender_t *sender) {
- const char *result = NULL;
- celixThreadMutex_lock(&sender->serializer.mutex);
- if (sender->serializer.props != NULL) {
- result = celix_properties_get(sender->serializer.props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
- }
- celixThreadMutex_unlock(&sender->serializer.mutex);
- return result;
+long pubsub_zmqTopicSender_serializerSvcId(pubsub_zmq_topic_sender_t *sender) {
+ return sender->serializerSvcId;
}
const char* pubsub_zmqTopicSender_scope(pubsub_zmq_topic_sender_t *sender) {
@@ -313,20 +299,6 @@ void pubsub_zmqTopicSender_disconnectFrom(pubsub_zmq_topic_sender_t *sender, con
//TODO
}
-static void pubsub_zmqTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props) {
- pubsub_zmq_topic_sender_t *sender = handle;
- pubsub_serializer_service_t *ser = svc;
-
- if (ser == NULL) {
- //TODO -> no serializer -> remove all publishers
- }
-
- celixThreadMutex_lock(&sender->serializer.mutex);
- sender->serializer.svc = ser;
- sender->serializer.props = props;
- celixThreadMutex_unlock(&sender->serializer.mutex);
-}
-
static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) {
pubsub_zmq_topic_sender_t *sender = handle;
long bndId = celix_bundle_getId(requestingBundle);
@@ -341,24 +313,19 @@ static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t *req
entry->parent = sender;
entry->bndId = bndId;
- celixThreadMutex_lock(&sender->serializer.mutex);
- celix_status_t rc = CELIX_SUCCESS;
- if (sender->serializer.svc != NULL) {
- rc = sender->serializer.svc->createSerializerMap(sender->serializer.svc->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
- }
- if (sender->serializer.svc == NULL || rc != CELIX_SUCCESS) {
- //TODO destroy and return NULL?
- L_ERROR("Error creating publisher service, serializer not available / cannot get msg serializer map\n");
+ int rc = sender->serializer->createSerializerMap(sender->serializer->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
+ if (rc == 0) {
+ entry->service.handle = entry;
+ entry->service.localMsgTypeIdForMsgType = psa_zmq_localMsgTypeIdForMsgType;
+ entry->service.send = psa_zmq_topicPublicationSend;
+ entry->service.sendMultipart = psa_zmq_topicPublicationSendMultipart;
+ hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
+ } else {
+ L_ERROR("Error creating serializer map for ZMQ TopicSender %s/%s", sender->scope, sender->topic);
}
- celixThreadMutex_unlock(&sender->serializer.mutex);
- entry->service.handle = entry;
- entry->service.localMsgTypeIdForMsgType = psa_zmq_localMsgTypeIdForMsgType;
- entry->service.send = psa_zmq_topicPublicationSend;
- entry->service.sendMultipart = psa_zmq_topicPublicationSendMultipart;
- hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
}
celixThreadMutex_unlock(&sender->boundedServices.mutex);
@@ -377,17 +344,10 @@ static void psa_zmq_ungetPublisherService(void *handle, const celix_bundle_t *re
if (entry != NULL && entry->getCount == 0) {
//free entry
hashMap_remove(sender->boundedServices.map, (void*)bndId);
-
-
- celixThreadMutex_lock(&sender->serializer.mutex);
- celix_status_t rc = CELIX_SUCCESS;
- if (sender->serializer.svc != NULL) {
- rc = sender->serializer.svc->destroySerializerMap(sender->serializer.svc->handle, entry->msgTypes);
- }
- if (sender->serializer.svc == NULL || rc != CELIX_SUCCESS) {
+ int rc = sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes);
+ if (rc != 0) {
L_ERROR("Error destroying publisher service, serializer not available / cannot get msg serializer map\n");
}
- celixThreadMutex_unlock(&sender->serializer.mutex);
free(entry);
}
http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
index 23a0c25..e537edd 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
@@ -29,17 +29,18 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
const char *scope,
const char *topic,
long serializerSvcId,
+ pubsub_serializer_service_t *ser,
const char *bindIP,
unsigned int basePort,
unsigned int maxPort);
void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender);
-const char* pubsub_zmqTopicSender_psaType(pubsub_zmq_topic_sender_t *sender);
-const char* pubsub_zmqTopicSender_serializerType(pubsub_zmq_topic_sender_t *sender);
const char* pubsub_zmqTopicSender_scope(pubsub_zmq_topic_sender_t *sender);
const char* pubsub_zmqTopicSender_topic(pubsub_zmq_topic_sender_t *sender);
const char* pubsub_zmqTopicSender_url(pubsub_zmq_topic_sender_t *sender);
+long pubsub_zmqTopicSender_serializerSvcId(pubsub_zmq_topic_sender_t *sender);
+
void pubsub_zmqTopicSender_connectTo(pubsub_zmq_topic_sender_t *sender, const celix_properties_t *endpoint);
void pubsub_zmqTopicSender_disconnectFrom(pubsub_zmq_topic_sender_t *sender, const celix_properties_t *endpoint);
http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 788e2de..a8e31ef 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -61,7 +61,6 @@ celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_help
status |= celixThreadMutex_create(&manager->pubsubadmins.mutex, &psaAttr);
celixThreadMutexAttr_destroy(&psaAttr);
- status |= celixThreadMutex_create(&manager->announcedEndpoints.mutex, NULL);
status |= celixThreadMutex_create(&manager->discoveredEndpoints.mutex, NULL);
status |= celixThreadMutex_create(&manager->announceEndpointListeners.mutex, NULL);
status |= celixThreadMutex_create(&manager->topicReceivers.mutex, NULL);
@@ -70,7 +69,6 @@ celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_help
status |= celixThreadCondition_init(&manager->psaHandling.cond, NULL);
- manager->announcedEndpoints.map = hashMap_create(NULL, NULL, NULL, NULL);
manager->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
manager->announceEndpointListeners.list = celix_arrayList_create();
manager->pubsubadmins.map = hashMap_create(NULL, NULL, NULL, NULL);
@@ -96,31 +94,13 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager
celixThreadMutex_unlock(&manager->psaHandling.mutex);
celixThread_join(manager->psaHandling.thread, NULL);
-
- celixThreadMutex_lock(&manager->announcedEndpoints.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(manager->announcedEndpoints.map);
- while (hashMapIterator_hasNext(&iter)) {
- celix_array_list_t *endpoints = hashMapIterator_nextValue(&iter);
- if (endpoints != NULL) {
- int size = celix_arrayList_size(endpoints);
- for (int i = 0; i < size; ++i) {
- celix_properties_t *ep = celix_arrayList_get(endpoints, i);
- celix_properties_destroy(ep);
- }
- celix_arrayList_destroy(endpoints);
- }
- }
- hashMap_destroy(manager->announcedEndpoints.map, false, false);
- celixThreadMutex_unlock(&manager->announcedEndpoints.mutex);
- celixThreadMutex_destroy(&manager->announcedEndpoints.mutex);
-
celixThreadMutex_lock(&manager->pubsubadmins.mutex);
hashMap_destroy(manager->pubsubadmins.map, false, false);
celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
celixThreadMutex_destroy(&manager->pubsubadmins.mutex);
celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
- iter = hashMapIterator_construct(manager->discoveredEndpoints.map);
+ hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveredEndpoints.map);
while (hashMapIterator_hasNext(&iter)) {
pstm_discovered_endpoint_entry_t *entry = hashMapIterator_nextValue(&iter);
if (entry != NULL) {
@@ -140,8 +120,10 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager
free(entry->scopeAndTopicKey);
free(entry->scope);
free(entry->topic);
+ if (entry->endpoint != NULL) {
+ celix_properties_destroy(entry->endpoint);
+ }
celix_properties_destroy(entry->subscriberProperties);
- celix_properties_destroy(entry->endpoint);
free(entry);
}
}
@@ -157,7 +139,9 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager
free(entry->scopeAndTopicKey);
free(entry->scope);
free(entry->topic);
- celix_properties_destroy(entry->endpoint);
+ if (entry->endpoint != NULL) {
+ celix_properties_destroy(entry->endpoint);
+ }
celix_filter_destroy(entry->publisherFilter);
free(entry);
}
@@ -175,6 +159,7 @@ void pubsub_topologyManager_psaAdded(void * handle, void *svc, const celix_prope
pubsub_topology_manager_t *manager = handle;
pubsub_admin_service_t *psa = (pubsub_admin_service_t*) svc;
+
long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "PSTM: Added PSA");
@@ -184,6 +169,8 @@ void pubsub_topologyManager_psaAdded(void * handle, void *svc, const celix_prope
celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
}
+ //NOTE new psa, so no endpoints announce yet
+
/* NOTE for now it assumed PSA / PST and PSD are started before subscribers/publisher
* so no retroactively adding subscribers
*
@@ -196,26 +183,6 @@ void pubsub_topologyManager_psaRemoved(void * handle, void *svc __attribute__((u
//pubsub_admin_service_t *psa = (pubsub_admin_service_t*) svc;
long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
- /* de-announce all publications */
- celixThreadMutex_lock(&manager->announcedEndpoints.mutex);
- celix_array_list_t *endpointsList = hashMap_remove(manager->announcedEndpoints.map, (void*)svcId);
- celixThreadMutex_unlock(&manager->announcedEndpoints.mutex);
-
- if (endpointsList != NULL) {
- for (int i = 0; i < celix_arrayList_size(endpointsList); ++i) {
- celix_properties_t *endpoint = celix_arrayList_get(endpointsList, i);
- celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
- for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) {
- pubsub_announce_endpoint_listener_t *listener;
- listener = celix_arrayList_get(manager->announceEndpointListeners.list, j);
- listener->removeEndpoint(listener->handle, endpoint);
- }
- celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
- celix_properties_destroy(endpoint);
- }
- celix_arrayList_destroy(endpointsList);
- }
-
//NOTE psa shutdown will teardown topic receivers / topic senders
//de-setup all topic receivers/senders for the removed psa.
//the next psaHandling run will try to find new psa.
@@ -225,7 +192,18 @@ void pubsub_topologyManager_psaRemoved(void * handle, void *svc __attribute__((u
while (hashMapIterator_hasNext(&iter)) {
pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
if (entry->selectedPsaSvcId == svcId) {
- entry->setup = false;
+ /* de-announce all senders */
+ if (entry->endpoint != NULL) {
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) {
+ pubsub_announce_endpoint_listener_t *listener;
+ listener = celix_arrayList_get(manager->announceEndpointListeners.list, j);
+ listener->removeEndpoint(listener->handle, entry->endpoint);
+ }
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+ }
+
+ entry->setup = false;
entry->selectedSerializerSvcId = -1L;
entry->selectedPsaSvcId = -1L;
if (entry->endpoint != NULL) {
@@ -241,6 +219,17 @@ void pubsub_topologyManager_psaRemoved(void * handle, void *svc __attribute__((u
while (hashMapIterator_hasNext(&iter)) {
pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
if (entry->selectedPsaSvcId == svcId) {
+ /* de-announce all receivers */
+ if (entry->endpoint != NULL) {
+ celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+ for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) {
+ pubsub_announce_endpoint_listener_t *listener;
+ listener = celix_arrayList_get(manager->announceEndpointListeners.list, j);
+ listener->removeEndpoint(listener->handle, entry->endpoint);
+ }
+ celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+ }
+
entry->setup = false;
entry->selectedSerializerSvcId = -1L;
entry->selectedPsaSvcId = -1L;
@@ -334,16 +323,25 @@ void pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded(void* handle, vo
//1) retroactively call announceEndpoint for already existing endpoints (manager->announcedEndpoints)
//2) Add listener to manager->announceEndpointListeners
- celixThreadMutex_lock(&manager->announcedEndpoints.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(manager->announcedEndpoints.map);
+ celixThreadMutex_lock(&manager->topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
while (hashMapIterator_hasNext(&iter)) {
- celix_array_list_t *endpoints = hashMapIterator_nextValue(&iter);
- for (int i = 0; i < celix_arrayList_size(endpoints); ++i) {
- celix_properties_t *ep = celix_arrayList_get(endpoints, i);
- listener->announceEndpoint(listener->handle, ep);
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (entry != NULL && entry->endpoint != NULL) {
+ listener->announceEndpoint(listener->handle, entry->endpoint);
}
}
- celixThreadMutex_unlock(&manager->announcedEndpoints.mutex);
+ celixThreadMutex_unlock(&manager->topicSenders.mutex);
+
+ celixThreadMutex_lock(&manager->topicReceivers.mutex);
+ iter = hashMapIterator_construct(manager->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (entry != NULL && entry->endpoint != NULL) {
+ listener->announceEndpoint(listener->handle, entry->endpoint);
+ }
+ }
+ celixThreadMutex_unlock(&manager->topicReceivers.mutex);
celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
celix_arrayList_add(manager->announceEndpointListeners.list, listener);
@@ -361,17 +359,6 @@ void pubsub_topologyManager_pubsubAnnounceEndpointListenerRemoved(void * handle,
celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
celix_arrayList_remove(manager->announceEndpointListeners.list, listener);
celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
-
- celixThreadMutex_lock(&manager->announcedEndpoints.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(manager->announcedEndpoints.map);
- while (hashMapIterator_hasNext(&iter)) {
- celix_array_list_t *endpoints = hashMapIterator_nextValue(&iter);
- for (int i = 0; i < celix_arrayList_size(endpoints); ++i) {
- celix_properties_t *ep = celix_arrayList_get(endpoints, i);
- listener->removeEndpoint(listener->handle, ep);
- }
- }
- celixThreadMutex_unlock(&manager->announcedEndpoints.mutex);
}
void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_service_tracker_info_t *info) {
http://git-wip-us.apache.org/repos/asf/celix/blob/b635e274/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index 3cca612..d6db117 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -46,11 +46,6 @@ typedef struct pubsub_topology_manager {
struct {
celix_thread_mutex_t mutex;
- hash_map_t *map; //key = psa svc id, value = list<celix_properties_t /*endpoint*/>
- } announcedEndpoints;
-
- struct {
- celix_thread_mutex_t mutex;
hash_map_t *map; //key = uuid , value = pstm_discovered_endpoint_entry_t
} discoveredEndpoints;