You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by er...@apache.org on 2018/11/27 20:03:01 UTC

[5/8] celix git commit: subscriber.map now std::map

subscriber.map now std::map


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/120895dd
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/120895dd
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/120895dd

Branch: refs/heads/nanomsg
Commit: 120895dd93c2c995d98327925fe88f939d345d12
Parents: 8658738
Author: Erjan Altena <er...@gmail.com>
Authored: Fri Nov 23 21:53:50 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Fri Nov 23 21:53:50 2018 +0100

----------------------------------------------------------------------
 .../src/pubsub_nanomsg_topic_receiver.cc        | 100 +++++++------------
 .../src/pubsub_nanomsg_topic_receiver.h         |  12 ++-
 2 files changed, 45 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/120895dd/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index 8acf6b1..2205ed2 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -63,13 +63,6 @@
 #define L_ERROR printf
 
 
-
-
-
-//static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props,
-//                                                         const celix_bundle_t *owner);
-
-
 pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
         log_helper_t *_logHelper,
         const char *_scope,
@@ -103,8 +96,8 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
         m_scope = strndup(m_scope, 1024 * 1024);
         m_topic = strndup(m_topic, 1024 * 1024);
 
-        subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
-        std::cout << "#### Creating subscirbers.map!! " << subscribers.map << "\n";
+        //subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+        //std::cout << "#### Creating subscirbers.map!! " << subscribers.map << "\n";
         //requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
         int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic);
@@ -139,38 +132,13 @@ pubsub::nanomsg::topic_receiver::~topic_receiver() {
 
         celix_bundleContext_stopTracker(ctx, subscriberTrackerId);
 
-        hash_map_iterator_t iter=hash_map_iterator_t();
         {
             std::lock_guard<std::mutex> _lock(subscribers.mutex);
-            iter = hashMapIterator_construct(subscribers.map);
-            while (hashMapIterator_hasNext(&iter)) {
-                psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter));
-                if (entry != NULL)  {
-                    serializer->destroySerializerMap(serializer->handle, entry->msgTypes);
-                    free(entry);
-                }
+            for(auto elem : subscribers.map) {
+                serializer->destroySerializerMap(serializer->handle, elem.second.msgTypes);
             }
-            hashMap_destroy(subscribers.map, false, false);
+            subscribers.map.clear();
         }
-
-
-//        {
-//            std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
-//            iter = hashMapIterator_construct(requestedConnections.map);
-//            while (hashMapIterator_hasNext(&iter)) {
-//                psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMapIterator_nextValue(&iter));
-//                if (entry != NULL) {
-//                    free(entry->url);
-//                    free(entry);
-//                }
-//            }
-//            hashMap_destroy(requestedConnections.map, false, false);
-//        }
-
-        //celixThreadMutex_destroy(&receiver->subscribers.mutex);
-        //celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
-        //celixThreadMutex_destroy(&receiver->recvThread.mutex);
-
         nn_close(m_nanoMsgSocket);
 
         free((void*)m_scope);
@@ -211,6 +179,7 @@ void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
                 std::piecewise_construct,
                 std::forward_as_tuple(std::string(url)),
                 std::forward_as_tuple(url, -1));
+        entry = requestedConnections.map.find(url);
     }
     if (!entry->second.isConnected()) {
         int connection_id = nn_connect(m_nanoMsgSocket, url);
@@ -254,21 +223,26 @@ void pubsub::nanomsg::topic_receiver::addSubscriber(void *svc, const celix_prope
     }
 
     std::lock_guard<std::mutex> _lock(subscribers.mutex);
-    psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(subscribers.map, (void*)bndId));
-    if (entry != NULL) {
-        entry->usageCount += 1;
+    auto entry = subscribers.map.find(bndId);
+    if (entry != subscribers.map.end()) {
+        entry->second.usageCount += 1;
     } else {
         //new create entry
-        entry = static_cast<psa_nanomsg_subscriber_entry_t*>(calloc(1, sizeof(*entry)));
-        entry->usageCount = 1;
-        entry->svc = static_cast<pubsub_subscriber_t*>(svc);
+        subscribers.map.emplace(std::piecewise_construct,
+                std::forward_as_tuple(bndId),
+                std::forward_as_tuple(static_cast<pubsub_subscriber_t*>(svc), 1));
+        entry = subscribers.map.find(bndId);
+        if (entry == subscribers.map.end()) {
+            std::cerr << "### THIS IS A VERY CRITICAL ERROR!!\n";
+        }
 
-        int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
+        int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)bnd, &entry->second.msgTypes);
         if (rc == 0) {
-            hashMap_put(subscribers.map, (void*)bndId, entry);
+            //hashMap_put(subscribers.map, (void*)bndId, entry);
+            //subscribers.map[bndId] = *entry;
         } else {
             L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", m_scope, m_topic);
-            free(entry);
+            subscribers.map.erase(bndId);
         }
     }
 }
@@ -278,18 +252,17 @@ void pubsub::nanomsg::topic_receiver::removeSubscriber(void */*svc*/,
     long bndId = celix_bundle_getId(bnd);
 
     std::lock_guard<std::mutex> _lock(subscribers.mutex);
-    psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(subscribers.map, (void*)bndId));
-    if (entry != NULL) {
-        entry->usageCount -= 1;
-    }
-    if (entry != NULL && entry->usageCount <= 0) {
-        //remove entry
-        hashMap_remove(subscribers.map, (void*)bndId);
-        int rc = serializer->destroySerializerMap(serializer->handle, entry->msgTypes);
-        if (rc != 0) {
-            L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", m_scope, m_topic);
+    auto entry = subscribers.map.find(bndId);
+    if (entry != subscribers.map.end()) {
+        entry->second.usageCount -= 1;
+        if (entry->second.usageCount <= 0) {
+            //remove entry
+            int rc = serializer->destroySerializerMap(serializer->handle, entry->second.msgTypes);
+            if (rc != 0) {
+                L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", m_scope, m_topic);
+            }
+            subscribers.map.erase(bndId);
         }
-        free(entry);
     }
 }
 
@@ -319,12 +292,13 @@ void pubsub::nanomsg::topic_receiver::processMsgForSubscriberEntry(psa_nanomsg_s
 
 void pubsub::nanomsg::topic_receiver::processMsg(const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) {
     std::lock_guard<std::mutex> _lock(subscribers.mutex);
-    hash_map_iterator_t iter = hashMapIterator_construct(subscribers.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter));
-        if (entry != NULL) {
-            processMsgForSubscriberEntry(entry, hdr, payload, payloadSize);
-        }
+    //hash_map_iterator_t iter = hashMapIterator_construct(subscribers.map);
+    //while (hashMapIterator_hasNext(&iter)) {
+    for (auto entry : subscribers.map) {
+        //psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter));
+        //if (entry != NULL) {
+            processMsgForSubscriberEntry(&entry.second, hdr, payload, payloadSize);
+        //}
     }
 }
 

http://git-wip-us.apache.org/repos/asf/celix/blob/120895dd/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
index 3398fb1..09d62a9 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
@@ -28,10 +28,13 @@
 #include "pubsub_nanomsg_common.h"
 #include "pubsub/subscriber.h"
 
-typedef struct psa_zmq_subscriber_entry {
+typedef struct psa_nanomsg_subscriber_entry {
+    psa_nanomsg_subscriber_entry(pubsub_subscriber_t *_svc, int _usageCount) :
+    svc{_svc}, usageCount{_usageCount} {
+    }
+    pubsub_subscriber_t *svc{};
     int usageCount;
-    hash_map_t *msgTypes; //map from serializer svc
-    pubsub_subscriber_t *svc;
+    hash_map_t *msgTypes{nullptr}; //map from serializer svc
 } psa_nanomsg_subscriber_entry_t;
 
 typedef struct psa_zmq_requested_connection_entry {
@@ -116,7 +119,8 @@ namespace pubsub {
             long subscriberTrackerId{0};
             struct {
                 std::mutex mutex;
-                hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
+                std::map<long, psa_nanomsg_subscriber_entry_t> map;
+                //hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
             } subscribers{};
         };
     }