You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2017/04/14 08:14:05 UTC
[4/8] celix git commit: CELIX-408: Refactoring in the pubsub
serializer usage. The topology manger does not have to started before the
pubsub admins anymore
CELIX-408: Refactoring in the pubsub serializer usage. The topology manger does not have to started before the pubsub admins anymore
Known issues:
- The serializer how to be started before the admins, to prevent issue when using a admin without serializer. Note admin should only register itself if a serializer is available.
- Still considerd instable
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/2d9c77d5
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/2d9c77d5
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/2d9c77d5
Branch: refs/heads/develop
Commit: 2d9c77d530184d5d3d119cd3f92202e332996c78
Parents: 7efe433
Author: Pepijn Noltes <pe...@gmail.com>
Authored: Wed Apr 12 13:27:09 2017 +0200
Committer: Pepijn Noltes <pe...@gmail.com>
Committed: Wed Apr 12 13:27:09 2017 +0200
----------------------------------------------------------------------
pubsub/deploy/CMakeLists.txt | 50 ++++----
.../include/pubsub_publish_service_private.h | 4 +-
.../private/src/pubsub_admin_impl.c | 5 +-
.../private/src/topic_publication.c | 104 +++++++++-------
.../private/src/topic_subscription.c | 57 ++++++---
.../include/pubsub_publish_service_private.h | 4 +-
.../private/include/topic_subscription.h | 2 +-
.../private/src/pubsub_admin_impl.c | 7 +-
.../private/src/topic_publication.c | 121 +++++++++++--------
.../private/src/topic_subscription.c | 102 ++++++++++------
.../private/src/pubsub_serializer_impl.c | 9 +-
.../private/src/pubsub_topology_manager.c | 11 +-
pubsub/test/CMakeLists.txt | 14 +--
pubsub/test/msg_descriptors/msg.descriptor | 3 +-
pubsub/test/test/msg.h | 4 +-
pubsub/test/test/sut_activator.c | 5 +-
pubsub/test/test/tst_activator.cpp | 13 +-
17 files changed, 322 insertions(+), 193 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/deploy/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/deploy/CMakeLists.txt b/pubsub/deploy/CMakeLists.txt
index 52fecc9..85b49d2 100644
--- a/pubsub/deploy/CMakeLists.txt
+++ b/pubsub/deploy/CMakeLists.txt
@@ -24,12 +24,12 @@ add_deploy("pubsub_publisher_udp_mc"
BUNDLES
shell
shell_tui
- org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+ org.apache.celix.pubsub_serializer.PubSubSerializerJson
+ org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
org.apache.celix.pubsub_admin.PubSubAdminUdpMc
org.apache.celix.pubsub_publisher.PoiPublisher
org.apache.celix.pubsub_publisher.PoiPublisher2
- org.apache.celix.pubsub_serializer.PubSubSerializerJson
)
add_deploy("pubsub_subscriber_udp_mc"
@@ -37,11 +37,11 @@ add_deploy("pubsub_subscriber_udp_mc"
BUNDLES
shell
shell_tui
- org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+ org.apache.celix.pubsub_serializer.PubSubSerializerJson
+ org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
org.apache.celix.pubsub_admin.PubSubAdminUdpMc
org.apache.celix.pubsub_subscriber.PoiSubscriber
- org.apache.celix.pubsub_serializer.PubSubSerializerJson
)
add_deploy("pubsub_subscriber2_udp_mc"
@@ -49,11 +49,11 @@ add_deploy("pubsub_subscriber2_udp_mc"
BUNDLES
shell
shell_tui
- org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+ org.apache.celix.pubsub_serializer.PubSubSerializerJson
+ org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
org.apache.celix.pubsub_admin.PubSubAdminUdpMc
org.apache.celix.pubsub_subscriber.PoiSubscriber
- org.apache.celix.pubsub_serializer.PubSubSerializerJson
)
if (ETCD_CMD AND XTERM_CMD)
@@ -81,13 +81,13 @@ if (BUILD_PUBSUB_PSA_ZMQ)
BUNDLES
shell
shell_tui
- org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+ org.apache.celix.pubsub_serializer.PubSubSerializerJson
+ org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
org.apache.celix.pubsub_admin.PubSubAdminZmq
org.apache.celix.pubsub_admin.PubSubAdminUdpMc
org.apache.celix.pubsub_publisher.PoiPublisher
org.apache.celix.pubsub_publisher.PoiPublisher2
- org.apache.celix.pubsub_serializer.PubSubSerializerJson
PROPERTIES
poi1.psa=zmq
poi2.psa=udp
@@ -98,12 +98,12 @@ if (BUILD_PUBSUB_PSA_ZMQ)
BUNDLES
shell
shell_tui
- org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+ org.apache.celix.pubsub_serializer.PubSubSerializerJson
+ org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
org.apache.celix.pubsub_admin.PubSubAdminZmq
org.apache.celix.pubsub_admin.PubSubAdminUdpMc
org.apache.celix.pubsub_subscriber.PoiSubscriber
- org.apache.celix.pubsub_serializer.PubSubSerializerJson
PROPERTIES
poi1.psa=zmq
poi2.psa=udp
@@ -115,12 +115,12 @@ if (BUILD_PUBSUB_PSA_ZMQ)
BUNDLES
shell
shell_tui
- org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+ org.apache.celix.pubsub_serializer.PubSubSerializerJson
+ org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
org.apache.celix.pubsub_admin.PubSubAdminZmq
org.apache.celix.pubsub_publisher.PoiPublisher
org.apache.celix.pubsub_subscriber.PoiSubscriber
- org.apache.celix.pubsub_serializer.PubSubSerializerJson
)
add_deploy("pubsub_publisher_zmq"
@@ -128,12 +128,12 @@ if (BUILD_PUBSUB_PSA_ZMQ)
BUNDLES
shell
shell_tui
- org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+ org.apache.celix.pubsub_serializer.PubSubSerializerJson
+ org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
org.apache.celix.pubsub_admin.PubSubAdminZmq
org.apache.celix.pubsub_publisher.PoiPublisher
org.apache.celix.pubsub_publisher.PoiPublisher2
- org.apache.celix.pubsub_serializer.PubSubSerializerJson
PROPERTIES
pubsub.scope=my_small_scope
)
@@ -143,11 +143,11 @@ if (BUILD_PUBSUB_PSA_ZMQ)
BUNDLES
shell
shell_tui
+ org.apache.celix.pubsub_serializer.PubSubSerializerJson
org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
org.apache.celix.pubsub_admin.PubSubAdminZmq
org.apache.celix.pubsub_subscriber.PoiSubscriber
- org.apache.celix.pubsub_serializer.PubSubSerializerJson
)
add_deploy("pubsub_subscriber2_zmq"
@@ -155,11 +155,12 @@ if (BUILD_PUBSUB_PSA_ZMQ)
BUNDLES
shell
shell_tui
+ org.apache.celix.pubsub_serializer.PubSubSerializerJson
org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
org.apache.celix.pubsub_admin.PubSubAdminZmq
org.apache.celix.pubsub_subscriber.PoiSubscriber
- org.apache.celix.pubsub_serializer.PubSubSerializerJson
+
)
# ZMQ Multipart
@@ -168,23 +169,23 @@ if (BUILD_PUBSUB_PSA_ZMQ)
BUNDLES
shell
shell_tui
+ org.apache.celix.pubsub_serializer.PubSubSerializerJson
org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
org.apache.celix.pubsub_admin.PubSubAdminZmq
org.apache.celix.pubsub_subscriber.MpSubscriber
- org.apache.celix.pubsub_serializer.PubSubSerializerJson
)
add_deploy("pubsub_mp_publisher_zmq"
GROUP "pubsub"
BUNDLES
- shell
- shell_tui
- org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
- org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
- org.apache.celix.pubsub_admin.PubSubAdminZmq
- org.apache.celix.pubsub_publisher.MpPublisher
- org.apache.celix.pubsub_serializer.PubSubSerializerJson
+ shell
+ shell_tui
+ org.apache.celix.pubsub_serializer.PubSubSerializerJson
+ org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+ org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
+ org.apache.celix.pubsub_admin.PubSubAdminZmq
+ org.apache.celix.pubsub_publisher.MpPublisher
)
if (ETCD_CMD AND XTERM_CMD)
@@ -207,6 +208,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
GROUP pubsub
DEPLOYMENTS
pubsub_publisher
+ pubsub_subscriber_zmq
pubsub_subscriber2_zmq
COMMANDS
etcd
http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
index 57d942a..b43fb08 100644
--- a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
+++ b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
@@ -42,13 +42,13 @@ typedef struct pubsub_udp_msg {
} pubsub_udp_msg_t;
typedef struct topic_publication *topic_publication_pt;
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, char* bindIP, topic_publication_pt *out);
celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
index ebfe3e6..3693970 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
@@ -431,7 +431,8 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
if (factory == NULL) {
topic_publication_pt pub = NULL;
- status = pubsub_topicPublicationCreate(admin->sendSocket, pubEP, admin->serializerSvc, admin->mcIpAddress,&pub);
+ status = pubsub_topicPublicationCreate(admin->sendSocket, pubEP, admin->mcIpAddress,&pub);
+ pubsub_topicPublicationSetSerializer(pub, admin->serializerSvc);
if(status == CELIX_SUCCESS){
status = pubsub_topicPublicationStart(admin->bundle_context,pub,&factory);
if(status==CELIX_SUCCESS && factory !=NULL){
@@ -655,7 +656,7 @@ celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serialize
while(hashMapIterator_hasNext(lp_iter)){
service_factory_pt factory = (service_factory_pt) hashMapIterator_nextValue(lp_iter);
topic_publication_pt topic_pub = (topic_publication_pt) factory->handle;
- pubsub_topicPublicationAddSerializer(topic_pub, admin->serializerSvc);
+ pubsub_topicPublicationSetSerializer(topic_pub, admin->serializerSvc);
}
hashMapIterator_destroy(lp_iter);
celixThreadMutex_unlock(&admin->localPublicationsLock);
http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
index be0a433..227761b 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
@@ -63,7 +63,7 @@ struct topic_publication {
typedef struct publish_bundle_bound_service {
topic_publication_pt parent;
- pubsub_publisher_pt service;
+ pubsub_publisher_t pubSvc;
bundle_pt bundle;
char *scope;
char *topic;
@@ -97,7 +97,7 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsig
static void delay_first_send_for_late_joiners(void);
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, topic_publication_pt *out){
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, char* bindIP, topic_publication_pt *out){
char* ep = malloc(EP_ADDRESS_LEN);
memset(ep,0,EP_ADDRESS_LEN);
@@ -116,7 +116,7 @@ celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt
pub->destAddr.sin_family = AF_INET;
pub->destAddr.sin_addr.s_addr = inet_addr(bindIP);
pub->destAddr.sin_port = htons(port);
- pub->serializerSvc = serializer;
+ pub->serializerSvc = NULL;
pubsub_topicPublicationAddPublisherEP(pub,pubEP);
@@ -222,30 +222,36 @@ celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub
return CELIX_SUCCESS;
}
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc){
+celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc){
celix_status_t status = CELIX_SUCCESS;
celixThreadMutex_lock(&(pub->tp_lock));
//clear old serializer
if (pub->serializerSvc != NULL) {
- hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); //key = bundle , value = svc
+ hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); //key = bundle_pt, publish_bundle_bound_service_t*
while (hashMapIterator_hasNext(&iter)) {
publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter);
+ celixThreadMutex_lock(&bound->mp_lock);
pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map);
+ celixThreadMutex_unlock(&bound->mp_lock);
bound->map = NULL;
}
}
//setup new serializer
pub->serializerSvc = serializerSvc;
- hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
- bundle_pt bundle = hashMapEntry_getKey(entry);
- publish_bundle_bound_service_t* bound = hashMapEntry_getValue(entry);
- pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, &bound->map);
- }
+ if (pub->serializerSvc != NULL) {
+ hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+ bundle_pt bundle = hashMapEntry_getKey(entry);
+ publish_bundle_bound_service_t *bound = hashMapEntry_getValue(entry);
+ celixThreadMutex_lock(&bound->mp_lock);
+ pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, &bound->map);
+ celixThreadMutex_unlock(&bound->mp_lock);
+ }
+ }
celixThreadMutex_unlock(&(pub->tp_lock));
@@ -260,7 +266,9 @@ celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub,
hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
while (hashMapIterator_hasNext(&iter)) {
publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter);
+ celixThreadMutex_lock(&bound->mp_lock);
pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map);
+ celixThreadMutex_unlock(&bound->mp_lock);
bound->map = NULL;
}
}
@@ -294,7 +302,7 @@ static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt
}
if (bound != NULL) {
- *service = bound->service;
+ *service = &bound->pubSvc;
}
celixThreadMutex_unlock(&(publish->tp_lock));
@@ -374,13 +382,21 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, con
if (bound->map == NULL) {
printf("TP: Serializer is not set!\n");
+ status = 1;
} else if (msgSer == NULL ){
printf("TP: No msg serializer available for msg type id %d\n", msgTypeId);
+ hash_map_iterator_t iter = hashMapIterator_construct(bound->map->serializers);
+ printf("Note supported messages:\n");
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_msg_serializer_t *msgSer = hashMapIterator_nextValue(&iter);
+ printf("\tmsg %s with id %d\n", msgSer->msgName, msgSer->msgId);
+ }
+ status = 1;
}
int major=0, minor=0;
- if (msgSer != NULL) {
+ if (status == 0 && msgSer != NULL) {
pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
msg_hdr->type = msgTypeId;
@@ -407,9 +423,6 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, con
free(msg);
free(serializedOutput);
- } else {
- printf("TP: Message %u not supported.\n",msgTypeId);
- status=-1;
}
celixThreadMutex_unlock(&(bound->mp_lock));
@@ -418,9 +431,30 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, con
return status;
}
-static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){
- *msgTypeId = utils_stringHash(msgType);
- return 0;
+static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* out){
+ publish_bundle_bound_service_t* bound = handle;
+ unsigned int msgTypeId = 0;
+
+ celixThreadMutex_lock(&bound->mp_lock);
+ if (bound->map != NULL) {
+ hash_map_iterator_t iter = hashMapIterator_construct(bound->map->serializers);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter);
+ if (strncmp(msgType, msgSer->msgName, 1024*1024) == 0) {
+ msgTypeId = msgSer->msgId;
+ break;
+ }
+ }
+ }
+ celixThreadMutex_unlock(&bound->mp_lock);
+
+ if (msgTypeId != 0) {
+ *out = msgTypeId;
+ return 0;
+ } else {
+ printf("TP: Cannot find msg type id for msg type %s\n", msgType);
+ return 1;
+ }
}
@@ -432,15 +466,10 @@ static unsigned int rand_range(unsigned int min, unsigned int max){
}
static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle) {
-
- publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound));
+ //PRECOND lock on publish->tp_lock
+ publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound));
if (bound != NULL) {
- bound->service = calloc(1, sizeof(*bound->service));
- }
-
- if (bound != NULL && bound->service != NULL) {
-
bound->parent = tp;
bound->bundle = bundle;
bound->getCount = 1;
@@ -452,21 +481,17 @@ static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(to
bound->scope=strdup(pubEP->scope);
bound->topic=strdup(pubEP->topic);
bound->largeUdpHandle = largeUdp_create(1);
- bound->service->handle = bound;
- bound->service->localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID;
- bound->service->send = pubsub_topicPublicationSend;
- bound->service->sendMultipart = NULL; //Multipart not supported (jet) for UDP
+ bound->pubSvc.handle = bound;
+ bound->pubSvc.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID;
+ bound->pubSvc.send = pubsub_topicPublicationSend;
+ bound->pubSvc.sendMultipart = NULL; //Multipart not supported (jet) for UDP
- //TODO check if lock on tp is needed? (e.g. is lock already done by caller?)
- if (tp->serializerSvc != NULL) {
+ if (tp->serializerSvc != NULL) {
tp->serializerSvc->createSerializerMap(tp->serializerSvc->handle, bundle, &bound->map);
}
}
else
{
- if (bound != NULL) {
- free(bound->service);
- }
free(bound);
return NULL;
}
@@ -475,14 +500,9 @@ static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(to
}
static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc) {
-
+ //PRECOND lock on publish->tp_lock
celixThreadMutex_lock(&boundSvc->mp_lock);
- if (boundSvc->service != NULL) {
- free(boundSvc->service);
- }
-
- //TODO check if lock on parent is needed, e.g. does the caller already lock?
if (boundSvc->map != NULL) {
if (boundSvc->parent->serializerSvc == NULL) {
printf("TP: Cannot destroy pubsub msg serializer map. No serliazer service\n");
http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
index da23b21..a424112 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
@@ -29,7 +29,6 @@
#include <unistd.h>
#include <signal.h>
-#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
@@ -57,7 +56,7 @@
#define UDP_BUFFER_SIZE 65535
#define MAX_UDP_SESSIONS 16
-struct topic_subscription{
+struct topic_subscription {
char* ifIpAddress;
service_tracker_pt tracker;
@@ -70,7 +69,8 @@ struct topic_subscription{
//NOTE. using a service ptr can be dangerous, because pointer can be reused.
//ensuring that pointer are removed before new (refurbish) pionter comes along is crucial!
- hash_map_pt msgSerializersMap; // key = service ptr, value = pubsub_msg_serializer_map_t*
+ hash_map_pt msgSerializerMapMap; // key = service ptr, value = pubsub_msg_serializer_map_t*
+ hash_map_pt bundleMap; //key = service ptr, value = bundle_pt
hash_map_pt socketMap; // key = URL, value = listen-socket
unsigned int nrSubscribers;
@@ -118,7 +118,8 @@ celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundl
celixThreadMutex_create(&ts->ts_lock,NULL);
arrayList_create(&ts->sub_ep_list);
- ts->msgSerializersMap = hashMap_create(NULL, NULL, NULL, NULL);
+ ts->msgSerializerMapMap = hashMap_create(NULL, NULL, NULL, NULL);
+ ts->bundleMap = hashMap_create(NULL, NULL, NULL, NULL);
ts->socketMap = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
ts->largeUdpHandle = largeUdp_create(MAX_UDP_SESSIONS);
@@ -164,7 +165,8 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
serviceTracker_destroy(ts->tracker);
arrayList_clear(ts->sub_ep_list);
arrayList_destroy(ts->sub_ep_list);
- hashMap_destroy(ts->msgSerializersMap,false,false);
+ hashMap_destroy(ts->msgSerializerMapMap, false, false);
+ hashMap_destroy(ts->bundleMap, false, false);
hashMap_destroy(ts->socketMap,false,false);
largeUdp_destroy(ts->largeUdpHandle);
@@ -394,13 +396,34 @@ unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
return ts->nrSubscribers;
}
-celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){
+celix_status_t pubsub_topicSubscriptionSetSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){
celix_status_t status = CELIX_SUCCESS;
celixThreadMutex_lock(&ts->ts_lock);
+ //clear old
+ if (ts->serializerSvc != NULL) {
+ hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+ pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry);
+ pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
+ ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
+ hashMap_put(ts->msgSerializerMapMap, subsvc, NULL);
+ }
+ }
ts->serializerSvc = serializerSvc;
-
+ //init new
+ if (ts->serializerSvc != NULL) {
+ hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_subscriber_t* subsvc = hashMapIterator_nextKey(&iter);
+ bundle_pt bundle = hashMap_get(ts->bundleMap, subsvc);
+ pubsub_msg_serializer_map_t* map = NULL;
+ ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map);
+ hashMap_put(ts->msgSerializerMapMap, subsvc, map);
+ }
+ }
celixThreadMutex_unlock(&ts->ts_lock);
return status;
@@ -411,10 +434,13 @@ celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts
celixThreadMutex_lock(&ts->ts_lock);
if (ts->serializerSvc == serializerSvc) { //only act if svc removed is services used
- hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializersMap);
+ hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap);
while (hashMapIterator_hasNext(&iter)) {
- pubsub_msg_serializer_map_t* map = hashMapIterator_nextValue(&iter);
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+ pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry);
+ pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
+ hashMap_put(ts->msgSerializerMapMap, subsvc, NULL);
}
ts->serializerSvc = NULL;
}
@@ -428,7 +454,7 @@ static celix_status_t topicsub_subscriberTracked(void * handle, service_referenc
topic_subscription_pt ts = handle;
celixThreadMutex_lock(&ts->ts_lock);
- if (!hashMap_containsKey(ts->msgSerializersMap, svc)) {
+ if (!hashMap_containsKey(ts->msgSerializerMapMap, svc)) {
bundle_pt bundle = NULL;
serviceReference_getBundle(reference, &bundle);
@@ -436,7 +462,8 @@ static celix_status_t topicsub_subscriberTracked(void * handle, service_referenc
pubsub_msg_serializer_map_t* map = NULL;
ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map);
if (map != NULL) {
- hashMap_put(ts->msgSerializersMap, svc, map);
+ hashMap_put(ts->msgSerializerMapMap, svc, map);
+ hashMap_put(ts->bundleMap, svc, bundle);
}
}
}
@@ -452,10 +479,12 @@ static celix_status_t topicsub_subscriberUntracked(void * handle, service_refere
celixThreadMutex_lock(&ts->ts_lock);
- if (hashMap_containsKey(ts->msgSerializersMap, svc)) {
- pubsub_msg_serializer_map_t* map = hashMap_remove(ts->msgSerializersMap, svc);
+ if (hashMap_containsKey(ts->msgSerializerMapMap, svc)) {
+ pubsub_msg_serializer_map_t* map = hashMap_remove(ts->msgSerializerMapMap, svc);
if (ts->serializerSvc != NULL){
ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
+ hashMap_remove(ts->bundleMap, svc);
+ hashMap_remove(ts->msgSerializerMapMap, svc);
}
}
celixThreadMutex_unlock(&ts->ts_lock);
@@ -467,7 +496,7 @@ static celix_status_t topicsub_subscriberUntracked(void * handle, service_refere
static void process_msg(topic_subscription_pt sub, pubsub_udp_msg_t* msg){
- hash_map_iterator_t iter = hashMapIterator_construct(sub->msgSerializersMap);
+ hash_map_iterator_t iter = hashMapIterator_construct(sub->msgSerializerMapMap);
celixThreadMutex_lock(&sub->ts_lock);
while (hashMapIterator_hasNext(&iter)) {
hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h b/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
index 158dfe7..dbd2ff1 100644
--- a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
+++ b/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
@@ -34,13 +34,13 @@
typedef struct topic_publication *topic_publication_pt;
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out);
celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
index 1fbbaaf..c1e78c3 100644
--- a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
+++ b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
@@ -51,7 +51,7 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
-celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsub_topicSubscriptionSetSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc);
celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc);
celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription);
http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
index 5c9a5d5..6095d8a 100644
--- a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
@@ -444,7 +444,8 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint
if (factory == NULL) {
topic_publication_pt pub = NULL;
- status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, admin->serializerSvc, admin->ipAddress, admin->basePort, admin->maxPort, &pub);
+ status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, admin->ipAddress, admin->basePort, admin->maxPort, &pub);
+ pubsub_topicPublicationSetSerializer(pub, admin->serializerSvc);
if (status == CELIX_SUCCESS) {
status = pubsub_topicPublicationStart(admin->bundle_context, pub, &factory);
if (status == CELIX_SUCCESS && factory != NULL) {
@@ -676,7 +677,7 @@ celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serialize
while(hashMapIterator_hasNext(lp_iter)){
service_factory_pt factory = (service_factory_pt) hashMapIterator_nextValue(lp_iter);
topic_publication_pt topic_pub = (topic_publication_pt) factory->handle;
- pubsub_topicPublicationAddSerializer(topic_pub, admin->serializerSvc);
+ pubsub_topicPublicationSetSerializer(topic_pub, admin->serializerSvc);
}
hashMapIterator_destroy(lp_iter);
celixThreadMutex_unlock(&admin->localPublicationsLock);
@@ -686,7 +687,7 @@ celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serialize
hash_map_iterator_pt subs_iter = hashMapIterator_create(admin->subscriptions);
while(hashMapIterator_hasNext(subs_iter)){
topic_subscription_pt topic_sub = (topic_subscription_pt) hashMapIterator_nextValue(subs_iter);
- pubsub_topicSubscriptionAddSerializer(topic_sub, admin->serializerSvc);
+ pubsub_topicSubscriptionSetSerializer(topic_sub, admin->serializerSvc);
}
hashMapIterator_destroy(subs_iter);
celixThreadMutex_unlock(&admin->subscriptionsLock);
http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
index 2e95874..28bf56e 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
@@ -74,7 +74,7 @@ struct topic_publication {
typedef struct publish_bundle_bound_service {
topic_publication_pt parent;
- pubsub_publisher_pt service;
+ pubsub_publisher_t pubSvc;
bundle_pt bundle;
char *topic;
pubsub_msg_serializer_map_t* map;
@@ -104,7 +104,7 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsig
static void delay_first_send_for_late_joiners(void);
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
celix_status_t status = CELIX_SUCCESS;
#ifdef BUILD_WITH_ZMQ_SECURITY
@@ -209,7 +209,7 @@ celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, p
pub->endpoint = ep;
pub->zmq_socket = socket;
- pub->serializerSvc = serializer;
+ pub->serializerSvc = NULL;
#ifdef BUILD_WITH_ZMQ_SECURITY
if (pubEP->is_secure){
@@ -298,9 +298,7 @@ celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
celix_status_t status = CELIX_SUCCESS;
//celixThreadMutex_lock(&(pub->tp_lock));
-
status = serviceRegistration_unregister(pub->svcFactoryReg);
-
//celixThreadMutex_unlock(&(pub->tp_lock));
return status;
@@ -331,7 +329,7 @@ celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub
return CELIX_SUCCESS;
}
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc){
+celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc){
celix_status_t status = CELIX_SUCCESS;
celixThreadMutex_lock(&(pub->tp_lock));
@@ -341,19 +339,25 @@ celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pu
hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
while (hashMapIterator_hasNext(&iter)) {
publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter);
- pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map);
+ celixThreadMutex_lock(&bound->mp_lock);
+ pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map);
+ celixThreadMutex_unlock(&bound->mp_lock);
bound->map = NULL;
}
}
pub->serializerSvc = serializerSvc;
- hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
- bundle_pt bundle = hashMapEntry_getKey(entry);
- publish_bundle_bound_service_t* boundSvc = hashMapEntry_getValue(entry);
- pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, &boundSvc->map);
- }
+ if (pub->serializerSvc != NULL) {
+ hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+ bundle_pt bundle = hashMapEntry_getKey(entry);
+ publish_bundle_bound_service_t* bound = hashMapEntry_getValue(entry);
+ celixThreadMutex_lock(&bound->mp_lock);
+ pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, &bound->map);
+ celixThreadMutex_unlock(&bound->mp_lock);
+ }
+ }
celixThreadMutex_unlock(&(pub->tp_lock));
@@ -367,9 +371,11 @@ celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub,
if (pub->serializerSvc == svc) {
hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
while (hashMapIterator_hasNext(&iter)) {
- publish_bundle_bound_service_t *boundSvc = hashMapIterator_nextValue(&iter);
- pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, boundSvc->map);
- boundSvc->map = NULL;
+ publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter);
+ celixThreadMutex_lock(&bound->mp_lock);
+ pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map);
+ celixThreadMutex_unlock(&bound->mp_lock);
+ bound->map = NULL;
}
pub->serializerSvc = NULL;
}
@@ -402,7 +408,7 @@ static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt
}
if(bound!=NULL){
- *service = bound->service;
+ *service = &bound->pubSvc;
}
celixThreadMutex_unlock(&(publish->tp_lock));
@@ -489,29 +495,41 @@ static bool send_pubsub_mp_msg(zsock_t* zmq_socket, array_list_pt mp_msg_parts){
}
static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg) {
-
return pubsub_topicPublicationSendMultipart(handle,msgTypeId,msg, PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG);
-
}
static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *msg, int flags){
int status = 0;
publish_bundle_bound_service_t* bound = handle;
-
celixThreadMutex_lock(&(bound->mp_lock));
+
if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & PUBSUB_PUBLISHER_LAST_MSG) && bound->mp_send_in_progress){ //means a real mp_msg
printf("TP: Multipart send already in progress. Cannot process a new one.\n");
celixThreadMutex_unlock(&(bound->mp_lock));
return -3;
- }
+ }
pubsub_msg_serializer_t* msgSer = NULL;
if (bound->map != NULL) {
msgSer = hashMap_get(bound->map->serializers, (void*)(uintptr_t)msgTypeId);
}
- int major=0, minor=0;
- if (msgSer != NULL) {
+ if (bound->map == NULL) {
+ printf("TP: Serializer is not set!\n");
+ status = 1;
+ } else if (msgSer == NULL ){
+ printf("TP: No msg serializer available for msg type id %d\n", msgTypeId);
+ hash_map_iterator_t iter = hashMapIterator_construct(bound->map->serializers);
+ printf("Note supported messages:\n");
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_msg_serializer_t *msgSer = hashMapIterator_nextValue(&iter);
+ printf("\tmsg %s with id %d\n", msgSer->msgName, msgSer->msgId);
+ }
+ status = 1;
+ }
+
+ int major=0, minor=0;
+ if (status == 0 && msgSer != NULL) {
pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
msg_hdr->type = msgTypeId;
@@ -579,19 +597,38 @@ static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTy
}
} else {
- printf("TP: Message %u not supported.",msgTypeId);
+ printf("TP: Message %u not supported.\n",msgTypeId);
status=-1;
}
celixThreadMutex_unlock(&(bound->mp_lock));
-
return status;
-
}
-static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){
- *msgTypeId = utils_stringHash(msgType);
- return 0;
+static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* out){
+ publish_bundle_bound_service_t* bound = handle;
+ unsigned int msgTypeId = 0;
+
+ celixThreadMutex_lock(&bound->mp_lock);
+ if (bound->map != NULL) {
+ hash_map_iterator_t iter = hashMapIterator_construct(bound->map->serializers);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter);
+ if (strncmp(msgType, msgSer->msgName, 1024*1024) == 0) {
+ msgTypeId = msgSer->msgId;
+ break;
+ }
+ }
+ }
+ celixThreadMutex_unlock(&bound->mp_lock);
+
+ if (msgTypeId != 0) {
+ *out = msgTypeId;
+ return 0;
+ } else {
+ printf("TP: Cannot find msg type id for msg type %s\n", msgType);
+ return 1;
+ }
}
static unsigned int rand_range(unsigned int min, unsigned int max){
@@ -602,14 +639,11 @@ static unsigned int rand_range(unsigned int min, unsigned int max){
}
static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){
+ //PRECOND lock on tp->lock
publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound));
if (bound != NULL) {
- bound->service = calloc(1, sizeof(*bound->service));
- }
-
- if (bound != NULL && bound->service != NULL) {
bound->parent = tp;
bound->bundle = bundle;
@@ -617,7 +651,6 @@ static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(to
bound->mp_send_in_progress = false;
celixThreadMutex_create(&bound->mp_lock,NULL);
- //TODO check if lock is needed. e.g. was the caller already locked?
if (tp->serializerSvc != NULL) {
tp->serializerSvc->createSerializerMap(tp->serializerSvc->handle, bundle, &bound->map);
}
@@ -626,17 +659,13 @@ static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(to
pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
bound->topic=strdup(pubEP->topic);
- bound->service->handle = bound;
- bound->service->localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID;
- bound->service->send = pubsub_topicPublicationSend;
- bound->service->sendMultipart = pubsub_topicPublicationSendMultipart;
-
+ bound->pubSvc.handle = bound;
+ bound->pubSvc.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID;
+ bound->pubSvc.send = pubsub_topicPublicationSend;
+ bound->pubSvc.sendMultipart = pubsub_topicPublicationSendMultipart;
}
else
{
- if (bound != NULL) {
- free(bound->service);
- }
free(bound);
return NULL;
}
@@ -645,13 +674,9 @@ static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(to
}
static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc){
-
+ //PRECOND lock on publish->tp_lock
celixThreadMutex_lock(&boundSvc->mp_lock);
- if (boundSvc->service != NULL) {
- free(boundSvc->service);
- }
-
if (boundSvc->map != NULL && boundSvc->parent->serializerSvc != NULL) {
boundSvc->parent->serializerSvc->destroySerializerMap(boundSvc->parent->serializerSvc->handle, boundSvc->map);
boundSvc->map = NULL;
http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
index 7ef2c5d..537fbe5 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
@@ -69,7 +69,8 @@ struct topic_subscription {
celix_thread_mutex_t ts_lock;
bundle_context_pt context;
- hash_map_pt msgSerializers; // key = service ptr, value = pubsub_msg_serializer_map_t*
+ hash_map_pt msgSerializerMapMap; // key = service ptr, value = pubsub_msg_serializer_map_t*
+ hash_map_pt bundleMap; //key = service ptr, value = bundle_pt
array_list_pt pendingConnections;
array_list_pt pendingDisconnections;
@@ -220,7 +221,8 @@ celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,
celixThreadMutex_create(&ts->socket_lock, NULL);
celixThreadMutex_create(&ts->ts_lock,NULL);
arrayList_create(&ts->sub_ep_list);
- ts->msgSerializers = hashMap_create(NULL, NULL, NULL, NULL);
+ ts->msgSerializerMapMap = hashMap_create(NULL, NULL, NULL, NULL);
+ ts->bundleMap = hashMap_create(NULL, NULL, NULL, NULL);
arrayList_create(&ts->pendingConnections);
arrayList_create(&ts->pendingDisconnections);
celixThreadMutex_create(&ts->pendingConnections_lock, NULL);
@@ -266,7 +268,8 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
serviceTracker_destroy(ts->tracker);
arrayList_clear(ts->sub_ep_list);
arrayList_destroy(ts->sub_ep_list);
- hashMap_destroy(ts->msgSerializers,false,false);
+ hashMap_destroy(ts->msgSerializerMapMap,false,false);
+ hashMap_destroy(ts->bundleMap,false,false);
celixThreadMutex_lock(&ts->pendingConnections_lock);
arrayList_destroy(ts->pendingConnections);
@@ -426,13 +429,34 @@ unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
return ts->nrSubscribers;
}
-celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){
+celix_status_t pubsub_topicSubscriptionSetserializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){
celix_status_t status = CELIX_SUCCESS;
celixThreadMutex_lock(&ts->ts_lock);
-
- ts->serializerSvc = serializerSvc;
-
+ //clear old
+ if (ts->serializerSvc != NULL) {
+ hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+ pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry);
+ pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
+ ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
+ hashMap_put(ts->msgSerializerMapMap, subsvc, NULL);
+
+ }
+ }
+ ts->serializerSvc = serializerSvc;
+ //init new
+ if (ts->serializerSvc != NULL) {
+ hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_subscriber_t* subsvc = hashMapIterator_nextKey(&iter);
+ bundle_pt bundle = hashMap_get(ts->bundleMap, subsvc);
+ pubsub_msg_serializer_map_t* map = NULL;
+ ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map);
+ hashMap_put(ts->msgSerializerMapMap, subsvc, map);
+ }
+ }
celixThreadMutex_unlock(&ts->ts_lock);
return status;
@@ -442,53 +466,59 @@ celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts
celix_status_t status = CELIX_SUCCESS;
celixThreadMutex_lock(&ts->ts_lock);
- if (ts->serializerSvc == svc) {
- hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializers);
- while(hashMapIterator_hasNext(&iter)){
- pubsub_msg_serializer_map_t* map = hashMapIterator_nextValue(&iter);
- ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
- }
- }
- ts->serializerSvc = NULL;
+ if (ts->serializerSvc == svc) { //only act if svc removed is services used
+ hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+ pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry);
+ pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
+ ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
+ hashMap_put(ts->msgSerializerMapMap, subsvc, NULL);
+ }
+ ts->serializerSvc = NULL;
+ }
celixThreadMutex_unlock(&ts->ts_lock);
return status;
}
-static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service){
+static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void* svc) {
celix_status_t status = CELIX_SUCCESS;
topic_subscription_pt ts = handle;
celixThreadMutex_lock(&ts->ts_lock);
- if (!hashMap_containsKey(ts->msgSerializers, service)) {
- bundle_pt bundle = NULL;
- serviceReference_getBundle(reference, &bundle);
-
- if (ts->serializerSvc != NULL) {
- pubsub_msg_serializer_map_t* map = NULL;
- ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map);
- if (map != NULL) {
- hashMap_put(ts->msgSerializers, service, map);
- } else {
- printf("TS: Cannot create msg serializer map\n");
- }
- }
- }
+ if (!hashMap_containsKey(ts->msgSerializerMapMap, svc)) {
+ bundle_pt bundle = NULL;
+ serviceReference_getBundle(reference, &bundle);
+
+ if (ts->serializerSvc != NULL) {
+ pubsub_msg_serializer_map_t* map = NULL;
+ ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map);
+ if (map != NULL) {
+ hashMap_put(ts->msgSerializerMapMap, svc, map);
+ hashMap_put(ts->bundleMap, svc, bundle);
+ }
+ }
+ }
celixThreadMutex_unlock(&ts->ts_lock);
printf("TS: New subscriber registered.\n");
return status;
}
-static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service){
+static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void* svc) {
celix_status_t status = CELIX_SUCCESS;
topic_subscription_pt ts = handle;
celixThreadMutex_lock(&ts->ts_lock);
- if (hashMap_containsKey(ts->msgSerializers, service)) {
- pubsub_msg_serializer_map_t* map = hashMap_remove(ts->msgSerializers, service);
- ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
- }
+ if (hashMap_containsKey(ts->msgSerializerMapMap, svc)) {
+ pubsub_msg_serializer_map_t* map = hashMap_remove(ts->msgSerializerMapMap, svc);
+ if (ts->serializerSvc != NULL){
+ ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
+ hashMap_remove(ts->bundleMap, svc);
+ hashMap_remove(ts->msgSerializerMapMap, svc);
+ }
+ }
celixThreadMutex_unlock(&ts->ts_lock);
printf("TS: Subscriber unregistered.\n");
@@ -500,7 +530,7 @@ static void process_msg(topic_subscription_pt sub, array_list_pt msg_list) {
pubsub_msg_header_pt first_msg_hdr = (pubsub_msg_header_pt)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->header);
- hash_map_iterator_t iter = hashMapIterator_construct(sub->msgSerializers);
+ hash_map_iterator_t iter = hashMapIterator_construct(sub->msgSerializerMapMap);
while (hashMapIterator_hasNext(&iter)) {
hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry);
http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c b/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
index 2dd8258..6145a38 100644
--- a/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
+++ b/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
@@ -250,14 +250,17 @@ static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle
bool clash = hashMap_containsKey(msgSerializers, (void*)(uintptr_t)msgId);
if (clash) {
printf("Cannot add msg %s. clash in msg id %d!!\n", msgName, msgId);
+ free(impl);
+ dynMessage_destroy(msgType);
} else if ( msgName != NULL && msgVersion != NULL && msgId != 0) {
hashMap_put(msgSerializers, (void*)(uintptr_t)msgId, &impl->msgSerializer);
} else {
- printf("Error adding creating msg serializer\n");
+ printf("Error creating msg serializer\n");
+ free(impl);
+ dynMessage_destroy(msgType);
}
- }
- else{
+ } else{
printf("DMU: cannot parse message from descriptor %s\n.",path);
}
fclose(stream);
http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
index 36ea422..6047cf8 100644
--- a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
+++ b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
@@ -226,10 +226,20 @@ celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_
celixThreadMutex_unlock(&manager->publicationsLock);
+
+ celixThreadMutex_lock(&manager->serializerListLock);
+ unsigned int size = arrayList_size(manager->serializerList);
+ if (size > 0) {
+ pubsub_serializer_service_t* ser = arrayList_get(manager->serializerList, (size-1)); //last, same as result of add/remove serializer
+ new_psa->setSerializer(new_psa->admin, ser);
+ }
+ celixThreadMutex_unlock(&manager->serializerListLock);
+
celixThreadMutex_lock(&manager->psaListLock);
arrayList_add(manager->psaList, new_psa);
celixThreadMutex_unlock(&manager->psaListLock);
+
return status;
}
@@ -335,7 +345,6 @@ celix_status_t pubsub_topologyManager_pubsubSerializerAdded(void* handle, servic
logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added pubsub serializer");
int i;
-
for(i=0; i<arrayList_size(manager->psaList); i++){
pubsub_admin_service_pt psa = (pubsub_admin_service_pt) arrayList_get(manager->psaList,i);
psa->setSerializer(psa->admin, new_serializer);
http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/test/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/test/CMakeLists.txt b/pubsub/test/CMakeLists.txt
index 7cd0003..3b1655b 100644
--- a/pubsub/test/CMakeLists.txt
+++ b/pubsub/test/CMakeLists.txt
@@ -38,11 +38,11 @@ bundle_files(pubsub_sut
add_deploy(pubsub_udpmc_sut
NAME deploy_sut
BUNDLES
+ org.apache.celix.pubsub_serializer.PubSubSerializerJson
org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+ #org.apache.celix.pubsub_admin.PubSubAdminUdpMc
+ org.apache.celix.pubsub_admin.PubSubAdminZmq
org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
- org.apache.celix.pubsub_admin.PubSubAdminUdpMc
- #org.apache.celix.pubsub_admin.PubSubAdminZmq
- org.apache.celix.pubsub_serializer.PubSubSerializerJson
pubsub_sut
DIR ${PROJECT_BINARY_DIR}/runtimes/test/pubsub/udpmc
)
@@ -60,11 +60,11 @@ bundle_files(pubsub_tst
add_deploy(pubsub_udpmc_tst
NAME deploy_tst
BUNDLES
- org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
- org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
- org.apache.celix.pubsub_admin.PubSubAdminUdpMc
- #org.apache.celix.pubsub_admin.PubSubAdminZmq
org.apache.celix.pubsub_serializer.PubSubSerializerJson
+ org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
+ org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+ #org.apache.celix.pubsub_admin.PubSubAdminUdpMc
+ org.apache.celix.pubsub_admin.PubSubAdminZmq
pubsub_tst
DIR ${PROJECT_BINARY_DIR}/runtimes/test/pubsub/udpmc
LAUNCHER celix_test_runner
http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/test/msg_descriptors/msg.descriptor
----------------------------------------------------------------------
diff --git a/pubsub/test/msg_descriptors/msg.descriptor b/pubsub/test/msg_descriptors/msg.descriptor
index 03b15ba..0eb28cb 100644
--- a/pubsub/test/msg_descriptors/msg.descriptor
+++ b/pubsub/test/msg_descriptors/msg.descriptor
@@ -3,6 +3,7 @@ type=message
name=msg
version=1.0.0
:annotations
+classname=org.example.Msg
:types
:message
-{n seqnR}
+{i seqnR}
http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/test/test/msg.h
----------------------------------------------------------------------
diff --git a/pubsub/test/test/msg.h b/pubsub/test/test/msg.h
index babfd1f..49169c5 100644
--- a/pubsub/test/test/msg.h
+++ b/pubsub/test/test/msg.h
@@ -20,8 +20,10 @@
#ifndef MSG_H
#define MSG_H
+#include <stdint.h>
+
typedef struct msg {
- int seqNr;
+ uint32_t seqNr;
} msg_t;
#endif //MSG_H
http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/test/test/sut_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/test/test/sut_activator.c b/pubsub/test/test/sut_activator.c
index 3e3b33b..6f02e79 100644
--- a/pubsub/test/test/sut_activator.c
+++ b/pubsub/test/test/sut_activator.c
@@ -19,6 +19,7 @@
#include <stdio.h>
#include <stdlib.h>
+#include <constants.h>
#include "bundle_activator.h"
#include "service_tracker.h"
@@ -57,7 +58,9 @@ celix_status_t bundleActivator_start(void * userData, bundle_context_pt context)
act->reg = NULL;
bundleContext_registerService(context, PUBSUB_SUBSCRIBER_SERVICE_NAME, &act->subSvc, props, &act->reg);
- const char* filter = "(&(objectClass=pubsub.publisher)(pubsub.topic=pong))";
+ char filter[512];
+ snprintf(filter, 512, "(&(%s=%s)(%s=%s))", OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_PUBLISHER_SERVICE_NAME, PUBSUB_PUBLISHER_TOPIC, "pong");
+
service_tracker_customizer_pt customizer = NULL;
serviceTrackerCustomizer_create(act, NULL, sut_pubAdded, NULL, sut_pubRemoved, &customizer);
serviceTracker_createWithFilter(context, filter, customizer, &act->tracker);
http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/test/test/tst_activator.cpp
----------------------------------------------------------------------
diff --git a/pubsub/test/test/tst_activator.cpp b/pubsub/test/test/tst_activator.cpp
index 266f73e..2cf5309 100644
--- a/pubsub/test/test/tst_activator.cpp
+++ b/pubsub/test/test/tst_activator.cpp
@@ -32,7 +32,7 @@
#include <CppUTest/TestHarness.h>
#include <CppUTestExt/MockSupport.h>
-
+#include <constants.h>
static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, pubsub_multipart_callbacks_t *callbacks, bool *release);
@@ -70,8 +70,10 @@ celix_status_t bundleActivator_start(__attribute__((unused)) void * userData, bu
g_act.subSvc.receive = tst_receive;
bundleContext_registerService(context, PUBSUB_SUBSCRIBER_SERVICE_NAME, &g_act.subSvc, props, &g_act.reg);
- const char* filter = "(&(objectClass=pubsub.publisher)(pubsub.topic=ping))";
- service_tracker_customizer_pt customizer = NULL;
+ char filter[512];
+ snprintf(filter, 512, "(&(%s=%s)(%s=%s))", OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_PUBLISHER_SERVICE_NAME, PUBSUB_PUBLISHER_TOPIC, "ping");
+
+ service_tracker_customizer_pt customizer = NULL;
serviceTrackerCustomizer_create(&g_act, NULL, tst_pubAdded, NULL, tst_pubRemoved, &customizer);
serviceTracker_createWithFilter(context, filter, customizer, &g_act.tracker);
serviceTracker_open(g_act.tracker);
@@ -157,7 +159,7 @@ TEST_GROUP(PUBSUB_INT_GROUP)
if (count > 0) {
break;
} else {
- printf("No return message received, waiting for a while\n");
+ printf("No return message received, waiting for a while. %d/%d\n", i+1, TRIES);
}
}
CHECK(count > 0);
@@ -176,7 +178,8 @@ TEST(PUBSUB_INT_GROUP, sendRecvTest) {
constexpr int COUNT = 50;
msg_t msg;
for (int i = 0; i < COUNT; ++i) {
- msg.seqNr = i;
+ msg.seqNr = i+1;
+ printf("Sending test msg %d of %d\n", i+1, COUNT);
pthread_mutex_lock(&g_act.mutex);
g_act.pubSvc->send(g_act.pubSvc->handle, g_act.msgId, &msg);
pthread_mutex_unlock(&g_act.mutex);