You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by er...@apache.org on 2018/11/27 20:03:01 UTC
[5/8] celix git commit: subscriber.map now std::map
subscriber.map now std::map
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/120895dd
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/120895dd
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/120895dd
Branch: refs/heads/nanomsg
Commit: 120895dd93c2c995d98327925fe88f939d345d12
Parents: 8658738
Author: Erjan Altena <er...@gmail.com>
Authored: Fri Nov 23 21:53:50 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Fri Nov 23 21:53:50 2018 +0100
----------------------------------------------------------------------
.../src/pubsub_nanomsg_topic_receiver.cc | 100 +++++++------------
.../src/pubsub_nanomsg_topic_receiver.h | 12 ++-
2 files changed, 45 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/120895dd/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index 8acf6b1..2205ed2 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -63,13 +63,6 @@
#define L_ERROR printf
-
-
-
-//static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props,
-// const celix_bundle_t *owner);
-
-
pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
log_helper_t *_logHelper,
const char *_scope,
@@ -103,8 +96,8 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
m_scope = strndup(m_scope, 1024 * 1024);
m_topic = strndup(m_topic, 1024 * 1024);
- subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
- std::cout << "#### Creating subscirbers.map!! " << subscribers.map << "\n";
+ //subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+ //std::cout << "#### Creating subscirbers.map!! " << subscribers.map << "\n";
//requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic);
@@ -139,38 +132,13 @@ pubsub::nanomsg::topic_receiver::~topic_receiver() {
celix_bundleContext_stopTracker(ctx, subscriberTrackerId);
- hash_map_iterator_t iter=hash_map_iterator_t();
{
std::lock_guard<std::mutex> _lock(subscribers.mutex);
- iter = hashMapIterator_construct(subscribers.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter));
- if (entry != NULL) {
- serializer->destroySerializerMap(serializer->handle, entry->msgTypes);
- free(entry);
- }
+ for(auto elem : subscribers.map) {
+ serializer->destroySerializerMap(serializer->handle, elem.second.msgTypes);
}
- hashMap_destroy(subscribers.map, false, false);
+ subscribers.map.clear();
}
-
-
-// {
-// std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
-// iter = hashMapIterator_construct(requestedConnections.map);
-// while (hashMapIterator_hasNext(&iter)) {
-// psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMapIterator_nextValue(&iter));
-// if (entry != NULL) {
-// free(entry->url);
-// free(entry);
-// }
-// }
-// hashMap_destroy(requestedConnections.map, false, false);
-// }
-
- //celixThreadMutex_destroy(&receiver->subscribers.mutex);
- //celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
- //celixThreadMutex_destroy(&receiver->recvThread.mutex);
-
nn_close(m_nanoMsgSocket);
free((void*)m_scope);
@@ -211,6 +179,7 @@ void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
std::piecewise_construct,
std::forward_as_tuple(std::string(url)),
std::forward_as_tuple(url, -1));
+ entry = requestedConnections.map.find(url);
}
if (!entry->second.isConnected()) {
int connection_id = nn_connect(m_nanoMsgSocket, url);
@@ -254,21 +223,26 @@ void pubsub::nanomsg::topic_receiver::addSubscriber(void *svc, const celix_prope
}
std::lock_guard<std::mutex> _lock(subscribers.mutex);
- psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(subscribers.map, (void*)bndId));
- if (entry != NULL) {
- entry->usageCount += 1;
+ auto entry = subscribers.map.find(bndId);
+ if (entry != subscribers.map.end()) {
+ entry->second.usageCount += 1;
} else {
//new create entry
- entry = static_cast<psa_nanomsg_subscriber_entry_t*>(calloc(1, sizeof(*entry)));
- entry->usageCount = 1;
- entry->svc = static_cast<pubsub_subscriber_t*>(svc);
+ subscribers.map.emplace(std::piecewise_construct,
+ std::forward_as_tuple(bndId),
+ std::forward_as_tuple(static_cast<pubsub_subscriber_t*>(svc), 1));
+ entry = subscribers.map.find(bndId);
+ if (entry == subscribers.map.end()) {
+ std::cerr << "### THIS IS A VERY CRITICAL ERROR!!\n";
+ }
- int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
+ int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)bnd, &entry->second.msgTypes);
if (rc == 0) {
- hashMap_put(subscribers.map, (void*)bndId, entry);
+ //hashMap_put(subscribers.map, (void*)bndId, entry);
+ //subscribers.map[bndId] = *entry;
} else {
L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", m_scope, m_topic);
- free(entry);
+ subscribers.map.erase(bndId);
}
}
}
@@ -278,18 +252,17 @@ void pubsub::nanomsg::topic_receiver::removeSubscriber(void */*svc*/,
long bndId = celix_bundle_getId(bnd);
std::lock_guard<std::mutex> _lock(subscribers.mutex);
- psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(subscribers.map, (void*)bndId));
- if (entry != NULL) {
- entry->usageCount -= 1;
- }
- if (entry != NULL && entry->usageCount <= 0) {
- //remove entry
- hashMap_remove(subscribers.map, (void*)bndId);
- int rc = serializer->destroySerializerMap(serializer->handle, entry->msgTypes);
- if (rc != 0) {
- L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", m_scope, m_topic);
+ auto entry = subscribers.map.find(bndId);
+ if (entry != subscribers.map.end()) {
+ entry->second.usageCount -= 1;
+ if (entry->second.usageCount <= 0) {
+ //remove entry
+ int rc = serializer->destroySerializerMap(serializer->handle, entry->second.msgTypes);
+ if (rc != 0) {
+ L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", m_scope, m_topic);
+ }
+ subscribers.map.erase(bndId);
}
- free(entry);
}
}
@@ -319,12 +292,13 @@ void pubsub::nanomsg::topic_receiver::processMsgForSubscriberEntry(psa_nanomsg_s
void pubsub::nanomsg::topic_receiver::processMsg(const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) {
std::lock_guard<std::mutex> _lock(subscribers.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(subscribers.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter));
- if (entry != NULL) {
- processMsgForSubscriberEntry(entry, hdr, payload, payloadSize);
- }
+ //hash_map_iterator_t iter = hashMapIterator_construct(subscribers.map);
+ //while (hashMapIterator_hasNext(&iter)) {
+ for (auto entry : subscribers.map) {
+ //psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter));
+ //if (entry != NULL) {
+ processMsgForSubscriberEntry(&entry.second, hdr, payload, payloadSize);
+ //}
}
}
http://git-wip-us.apache.org/repos/asf/celix/blob/120895dd/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
index 3398fb1..09d62a9 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
@@ -28,10 +28,13 @@
#include "pubsub_nanomsg_common.h"
#include "pubsub/subscriber.h"
-typedef struct psa_zmq_subscriber_entry {
+typedef struct psa_nanomsg_subscriber_entry {
+ psa_nanomsg_subscriber_entry(pubsub_subscriber_t *_svc, int _usageCount) :
+ svc{_svc}, usageCount{_usageCount} {
+ }
+ pubsub_subscriber_t *svc{};
int usageCount;
- hash_map_t *msgTypes; //map from serializer svc
- pubsub_subscriber_t *svc;
+ hash_map_t *msgTypes{nullptr}; //map from serializer svc
} psa_nanomsg_subscriber_entry_t;
typedef struct psa_zmq_requested_connection_entry {
@@ -116,7 +119,8 @@ namespace pubsub {
long subscriberTrackerId{0};
struct {
std::mutex mutex;
- hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
+ std::map<long, psa_nanomsg_subscriber_entry_t> map;
+ //hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
} subscribers{};
};
}