You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by er...@apache.org on 2018/11/27 20:02:59 UTC

[3/8] celix git commit: nanomsg Topic receiver to class

nanomsg Topic receiver to class


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/0abbf432
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/0abbf432
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/0abbf432

Branch: refs/heads/nanomsg
Commit: 0abbf4323838b5823b6f275ab418438727dfe289
Parents: c19a5bd
Author: Erjan Altena <er...@gmail.com>
Authored: Wed Nov 21 20:34:12 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Wed Nov 21 21:10:41 2018 +0100

----------------------------------------------------------------------
 .../src/pubsub_nanomsg_common.h                 |  4 +-
 .../src/pubsub_nanomsg_topic_receiver.cc        | 90 ++++++--------------
 .../src/pubsub_nanomsg_topic_receiver.h         | 42 +++++----
 3 files changed, 47 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/0abbf432/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
index 28293a8..3d5d48d 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
@@ -37,14 +37,14 @@
  */
 
 
-struct pubsub_zmq_msg_header {
+struct pubsub_nanomsg_msg_header {
     //header
     unsigned int type;
     unsigned char major;
     unsigned char minor;
 };
 
-typedef struct pubsub_zmq_msg_header pubsub_nanmosg_msg_header_t;
+typedef struct pubsub_nanomsg_msg_header pubsub_nanmosg_msg_header_t;
 
 
 int psa_nanoMsg_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId);

http://git-wip-us.apache.org/repos/asf/celix/blob/0abbf432/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index 889d79d..88886c6 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -17,6 +17,7 @@
  *under the License.
  */
 
+#include <iostream>
 #include <mutex>
 #include <memory.h>
 #include <vector>
@@ -61,34 +62,6 @@
 #define L_WARN printf
 #define L_ERROR printf
 
-struct pubsub_nanomsg_topic_receiver {
-    celix_bundle_context_t *ctx;
-    log_helper_t *logHelper;
-    long serializerSvcId;
-    pubsub_serializer_service_t *serializer;
-    char *scope;
-    char *topic;
-    char scopeAndTopicFilter[5];
-
-    int nanoMsgSocket;
-
-    struct {
-        celix_thread_t thread;
-        std::mutex mutex;
-        bool running;
-    } recvThread;
-
-    struct {
-        std::mutex mutex;
-        hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t*
-    } requestedConnections;
-
-    long subscriberTrackerId;
-    struct {
-        std::mutex mutex;
-        hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
-    } subscribers;
-};
 
 typedef struct psa_zmq_requested_connection_entry {
     char *url;
@@ -96,23 +69,14 @@ typedef struct psa_zmq_requested_connection_entry {
     int id;
 } psa_nanomsg_requested_connection_entry_t;
 
-typedef struct psa_zmq_subscriber_entry {
-    int usageCount;
-    hash_map_t *msgTypes; //map from serializer svc
-    pubsub_subscriber_t *svc;
-} psa_nanomsg_subscriber_entry_t;
 
 
-static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
+static void pubsub_nanomsgTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props,
+                                                      const celix_bundle_t *owner);
 static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props,
                                                          const celix_bundle_t *owner);
-static void* psa_nanomsg_recvThread(void *data);
 
 
-//pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
-//                                                                    log_helper_t *logHelper, const char *scope,
-//                                                                    const char *topic, long serializerSvcId,
-//                                                                    pubsub_serializer_service_t *serializer) {
 pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
         log_helper_t *_logHelper,
         const char *_scope,
@@ -149,6 +113,7 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
         m_topic = strndup(m_topic, 1024 * 1024);
 
         subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+        std::cout << "#### Creating subscirbers.map!! " << subscribers.map << "\n";
         requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
         int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic);
@@ -159,15 +124,13 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
         opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
         opts.filter.filter = buf;
         opts.callbackHandle = this;
-        opts.addWithOwner = pubsub_zmqTopicReceiver_addSubscriber;
+        opts.addWithOwner = pubsub_nanomsgTopicReceiver_addSubscriber;
         opts.removeWithOwner = pubsub_nanoMsgTopicReceiver_removeSubscriber;
 
         subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
         recvThread.running = true;
-        celixThread_create(&recvThread.thread, NULL, psa_nanomsg_recvThread, this);
-        std::stringstream namestream;
-        namestream << "NANOMSG TR " << m_scope << "/" << m_topic;
-        celixThread_setName(&recvThread.thread, namestream.str().c_str());
+
+        recvThread.thread = std::thread([this]() {this->recvThread_exec();});
     }
 }
 
@@ -177,7 +140,7 @@ pubsub::nanomsg::topic_receiver::~topic_receiver() {
             std::lock_guard<std::mutex> _lock(recvThread.mutex);
             recvThread.running = false;
         }
-        celixThread_join(recvThread.thread, NULL);
+        recvThread.thread.join();
 
         celix_bundleContext_stopTracker(ctx, subscriberTrackerId);
 
@@ -285,12 +248,13 @@ void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) {
     }
 }
 
-static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
-    pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(handle);
+static void pubsub_nanomsgTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props,
+                                                      const celix_bundle_t *bnd) {
+    auto *receiver = static_cast<pubsub::nanomsg::topic_receiver*>(handle);
 
     long bndId = celix_bundle_getId(bnd);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
-    if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) {
+    if (strncmp(subScope, receiver->m_scope, strlen(receiver->m_scope)) != 0) {
         //not the same scope. ignore
         return;
     }
@@ -309,7 +273,7 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
         if (rc == 0) {
             hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
         } else {
-            L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+            L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", receiver->m_scope, receiver->m_topic);
             free(entry);
         }
     }
@@ -317,7 +281,7 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
 
 static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void */*svc*/,
                                                          const celix_properties_t */*props*/, const celix_bundle_t *bnd) {
-    pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(handle);
+    auto receiver = static_cast<pubsub::nanomsg::topic_receiver*>(handle);
 
     long bndId = celix_bundle_getId(bnd);
 
@@ -331,13 +295,13 @@ static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void */*s
         hashMap_remove(receiver->subscribers.map, (void*)bndId);
         int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
         if (rc != 0) {
-            L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+            L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->m_scope, receiver->m_topic);
         }
         free(entry);
     }
 }
 
-static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver *receiver, psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) {
+void pubsub::nanomsg::topic_receiver::processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) {
     pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type)));
     pubsub_subscriber_t *svc = entry->svc;
 
@@ -353,7 +317,7 @@ static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver *r
                     msgSer->freeMsg(msgSer->handle, deserializedMsg);
                 }
             } else {
-                L_WARN("[PSA_NANOMSG_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope, receiver->topic);
+                //L_WARN("[PSA_NANOMSG_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, scope, topic);
             }
         }
     } else {
@@ -361,13 +325,13 @@ static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver *r
     }
 }
 
-static inline void processMsg(pubsub_nanomsg_topic_receiver *receiver, const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) {
-    std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
-    hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+void pubsub::nanomsg::topic_receiver::processMsg(const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) {
+    std::lock_guard<std::mutex> _lock(subscribers.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(subscribers.map);
     while (hashMapIterator_hasNext(&iter)) {
         psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter));
         if (entry != NULL) {
-            processMsgForSubscriberEntry(receiver, entry, hdr, payload, payloadSize);
+            processMsgForSubscriberEntry(entry, hdr, payload, payloadSize);
         }
     }
 }
@@ -377,12 +341,11 @@ struct Message {
     char payload[];
 };
 
-static void* psa_nanomsg_recvThread(void *data) {
-    pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(data);
+void pubsub::nanomsg::topic_receiver::recvThread_exec() {
     bool running{};
     {
-        std::lock_guard<std::mutex> _lock(receiver->recvThread.mutex);
-        running = receiver->recvThread.running;
+        std::lock_guard<std::mutex> _lock(recvThread.mutex);
+        running = recvThread.running;
     }
     while (running) {
         Message *msg = nullptr;
@@ -400,9 +363,9 @@ static void* psa_nanomsg_recvThread(void *data) {
         msgHdr.msg_controllen = 0;
 
         errno = 0;
-        int recvBytes = nn_recvmsg(receiver->nanoMsgSocket, &msgHdr, 0);
+        int recvBytes = nn_recvmsg(m_nanoMsgSocket, &msgHdr, 0);
         if (msg && static_cast<unsigned long>(recvBytes) >= sizeof(pubsub_nanmosg_msg_header_t)) {
-            processMsg(receiver, &msg->header, msg->payload, recvBytes-sizeof(msg->header));
+            processMsg(&msg->header, msg->payload, recvBytes-sizeof(msg->header));
             nn_freemsg(msg);
         } else if (recvBytes >= 0) {
             L_ERROR("[PSA_ZMQ_TR] Error receiving nanmosg msg, size (%d) smaller than header\n", recvBytes);
@@ -415,5 +378,4 @@ static void* psa_nanomsg_recvThread(void *data) {
         }
     } // while
 
-    return NULL;
 }

http://git-wip-us.apache.org/repos/asf/celix/blob/0abbf432/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
index 6cd216b..f977917 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
@@ -16,15 +16,24 @@
  *specific language governing permissions and limitations
  *under the License.
  */
-#ifndef CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H
-#define CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H
+#pragma once
 #include <string>
 #include <vector>
+#include <thread>
+#include <mutex>
 #include "pubsub_serializer.h"
 #include "log_helper.h"
 #include "celix_bundle_context.h"
+#include "pubsub_nanomsg_common.h"
+#include "pubsub/subscriber.h"
+
+typedef struct psa_zmq_subscriber_entry {
+    int usageCount;
+    hash_map_t *msgTypes; //map from serializer svc
+    pubsub_subscriber_t *svc;
+} psa_nanomsg_subscriber_entry_t;
+
 
-//typedef struct pubsub_nanomsg_topic_receiver pubsub_nanomsg_topic_receiver_t;
 namespace pubsub {
     namespace nanomsg {
         class topic_receiver {
@@ -46,7 +55,10 @@ namespace pubsub {
             void listConnections(std::vector<std::string> &connectedUrls, std::vector<std::string> &unconnectedUrls);
             void connectTo(const char *url);
             void disconnectFrom(const char *url);
-        private:
+            void recvThread_exec();
+            void processMsg(const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize);
+            void processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize);
+        //private:
             celix_bundle_context_t *ctx{nullptr};
             log_helper_t *logHelper{nullptr};
             long m_serializerSvcId{0};
@@ -58,7 +70,7 @@ namespace pubsub {
             int m_nanoMsgSocket{0};
 
             struct {
-                celix_thread_t thread;
+                std::thread thread;
                 std::mutex mutex;
                 bool running;
             } recvThread{};
@@ -74,22 +86,6 @@ namespace pubsub {
                 hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
             } subscribers{};
         };
-    }}
-//pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
-//                                                                    log_helper_t *logHelper, const char *scope,
-//                                                                    const char *topic, long serializerSvcId,
-//                                                                    pubsub_serializer_service_t *serializer);
-//void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiver);
-
-//const char* pubsub_nanoMsgTopicReceiver_scope(pubsub_nanomsg_topic_receiver_t *receiver);
-//const char* pubsub_nanoMsgTopicReceiver_topic(pubsub_nanomsg_topic_receiver_t *receiver);
-
-//long pubsub_nanoMsgTopicReceiver_serializerSvcId(pubsub_nanomsg_topic_receiver_t *receiver);
-//void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t *receiver,
-//                                                 std::vector<std::string> &connectedUrls,
-//                                                 std::vector<std::string> &unconnectedUrls);
-
-//void pubsub_nanoMsgTopicReceiver_connectTo(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
-//void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
+    }
+}
 
-#endif //CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H