You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by er...@apache.org on 2018/11/27 20:03:03 UTC

[7/8] celix git commit: Nanomsg

Nanomsg


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/cdefb0d6
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/cdefb0d6
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/cdefb0d6

Branch: refs/heads/nanomsg
Commit: cdefb0d665b27a41f360599598ff489b322b4405
Parents: 883abee
Author: Erjan Altena <er...@gmail.com>
Authored: Mon Nov 26 20:23:11 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Mon Nov 26 20:23:11 2018 +0100

----------------------------------------------------------------------
 .../log_service/loghelper_include/log_helper.h  |  2 +-
 bundles/log_service/src/log_helper.c            |  2 +-
 .../src/pubsub_nanomsg_admin.cc                 | 62 ++++++++------------
 .../src/pubsub_nanomsg_admin.h                  | 10 ++--
 .../src/pubsub_nanomsg_topic_receiver.cc        | 10 ++--
 .../src/pubsub_nanomsg_topic_sender.cc          | 11 +---
 6 files changed, 42 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/log_service/loghelper_include/log_helper.h
----------------------------------------------------------------------
diff --git a/bundles/log_service/loghelper_include/log_helper.h b/bundles/log_service/loghelper_include/log_helper.h
index 28e6877..af058eb 100644
--- a/bundles/log_service/loghelper_include/log_helper.h
+++ b/bundles/log_service/loghelper_include/log_helper.h
@@ -33,7 +33,7 @@ celix_status_t logHelper_create(bundle_context_pt context, log_helper_pt* log_he
 celix_status_t logHelper_start(log_helper_pt loghelper);
 celix_status_t logHelper_stop(log_helper_pt loghelper);
 celix_status_t logHelper_destroy(log_helper_pt* loghelper);
-celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* message, ... );
+celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, const char* message, ... );
 #ifdef __cplusplus
 }
 #endif

http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/log_service/src/log_helper.c
----------------------------------------------------------------------
diff --git a/bundles/log_service/src/log_helper.c b/bundles/log_service/src/log_helper.c
index 6570357..e9939ed 100644
--- a/bundles/log_service/src/log_helper.c
+++ b/bundles/log_service/src/log_helper.c
@@ -156,7 +156,7 @@ celix_status_t logHelper_destroy(log_helper_pt* loghelper) {
 
 
 
-celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* message, ... )
+celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, const char* message, ... )
 {
     celix_status_t status = CELIX_SUCCESS;
 	va_list listPointer;

http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index 030441d..cf516ee 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -32,20 +32,15 @@
 #include "pubsub_utils.h"
 #include "pubsub_nanomsg_admin.h"
 #include "pubsub_psa_nanomsg_constants.h"
-/*
-//#define L_DEBUG(...) \
-//    logHelper_log(psa->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
-//#define L_INFO(...) \
-//    logHelper_log(psa->log, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
-//#define L_WARN(...) \
-//    logHelper_log(psa->log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
-//#define L_ERROR(...) \
-//    logHelper_log(psa->log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
-*/
-#define L_DEBUG printf
-#define L_INFO printf
-#define L_WARN printf
-#define L_ERROR printf
+
+#define L_DEBUG(...) \
+    logHelper_log(log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+    logHelper_log(log, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+    logHelper_log(log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+    logHelper_log(log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
 
 
 
@@ -125,10 +120,7 @@ pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
 
     {
         std::lock_guard<std::mutex> lock(serializers.mutex);
-        // todo: do not use pointer but type in map
-        for(auto kv: serializers.map) {
-            free(kv.second);
-        }
+        serializers.map.clear();
     }
 
     free(ipAddress);
@@ -229,11 +221,14 @@ void pubsub_nanomsg_admin::addSerializerSvc(void *svc, const celix_properties_t
         std::lock_guard<std::mutex> lock(serializers.mutex);
         auto it = serializers.map.find(svcId);
         if (it == serializers.map.end()) {
-            auto entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(psa_nanomsg_serializer_entry_t)));
-            entry->serType = serType;
-            entry->svcId = svcId;
-            entry->svc = static_cast<pubsub_serializer_service_t*>(svc);
-            serializers.map[svcId] = entry;
+            serializers.map.emplace(std::piecewise_construct,
+                    std::forward_as_tuple(svcId),
+                    std::forward_as_tuple(serType, svcId, static_cast<pubsub_serializer_service_t*>(svc)));
+//            auto entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(psa_nanomsg_serializer_entry_t)));
+//            entry->serType = serType;
+//            entry->svcId = svcId;
+//            entry->svc = static_cast<pubsub_serializer_service_t*>(svc);
+//            serializers.map[svcId] = entry;
         }
     }
 }
@@ -250,18 +245,14 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
 
     std::lock_guard<std::mutex> lock(serializers.mutex);
 
-    psa_nanomsg_serializer_entry_t* entry = nullptr;
     auto kvsm = serializers.map.find(svcId);
     if (kvsm != serializers.map.end()) {
-        entry = kvsm->second;
-    }
-    serializers.map.erase(svcId);
-    if (entry != nullptr) {
+        auto &entry = kvsm->second;
         {
             std::lock_guard<std::mutex> senderLock(topicSenders.mutex);
                 for (auto kv: topicSenders.map) {
                 auto *sender = kv.second;
-                if (sender != nullptr && entry->svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) {
+                if (sender != nullptr && entry.svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) {
                     char *key = kv.first;
                     topicSenders.map.erase(kv.first);
                     pubsub_nanoMsgTopicSender_destroy(sender);
@@ -274,7 +265,7 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
             std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex);
             for (auto kv : topicReceivers.map){
                 auto *receiver = kv.second;
-                if (receiver != nullptr && entry->svcId == receiver->serializerSvcId()) {
+                if (receiver != nullptr && entry.svcId == receiver->serializerSvcId()) {
                     auto key = kv.first;
                     topicReceivers.map.erase(key);
                     delete receiver;
@@ -282,7 +273,6 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
             }
         }
 
-        free(entry);
     }
 }
 
@@ -340,7 +330,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
         psa_nanomsg_serializer_entry_t *serEntry = nullptr;
         auto kv = serializers.map.find(serializerSvcId);
         if (kv != serializers.map.end()) {
-            serEntry = kv->second;
+            serEntry = &kv->second;
         }
         if (serEntry != nullptr) {
             sender = pubsub_nanoMsgTopicSender_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc, ipAddress,
@@ -418,13 +408,13 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const std::string &scope
             auto kvs = serializers.map.find(serializerSvcId);
             if (kvs != serializers.map.end()) {
                 auto serEntry = kvs->second;
-                receiver = new pubsub::nanomsg::topic_receiver(ctx, log, scope, topic, serializerSvcId, serEntry->svc);
+                receiver = new pubsub::nanomsg::topic_receiver(ctx, log, scope, topic, serializerSvcId, serEntry.svc);
             } else {
                 L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope.c_str(), topic.c_str());
             }
             if (receiver != nullptr) {
                 const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
-                const char *serType = kvs->second->serType;
+                const char *serType = kvs->second.serType;
                 newEndpoint = pubsubEndpoint_create(fwUUID, scope.c_str(), topic.c_str(), PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
                                                     serType, nullptr);
                 //if available also set container name
@@ -577,7 +567,7 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
             pubsub_nanomsg_topic_sender_t *sender = kvts.second;
             long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender);
             auto kvs = serializers.map.find(serSvcId);
-            const char *serType = kvs->second == nullptr ? "!Error!" : kvs->second->serType;
+            const char* serType = ( kvs == serializers.map.end() ? "!Error" :  kvs->second.serType);
             const char *scope = pubsub_nanoMsgTopicSender_scope(sender);
             const char *topic = pubsub_nanoMsgTopicSender_topic(sender);
             const char *url = pubsub_nanoMsgTopicSender_url(sender);
@@ -596,7 +586,7 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut
             pubsub::nanomsg::topic_receiver *receiver = entry.second;
             long serSvcId = receiver->serializerSvcId();
             auto kv =  serializers.map.find(serSvcId);
-            const char *serType = kv->second == nullptr ? "!Error!" : kv->second->serType;
+            const char *serType = (kv == serializers.map.end() ? "!Error!" : kv->second.serType);
             auto scope = receiver->scope();
             auto topic = receiver->topic();
 

http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index b33a3c0..689ae20 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -43,8 +43,6 @@
 
 #define PUBSUB_NANOMSG_DEFAULT_IP       "127.0.0.1"
 
-//typedef struct pubsub_nanomsg_admin pubsub_nanomsg_admin_t;
-
 template <typename key, typename value>
 struct ProtectedMap {
     std::mutex mutex{};
@@ -111,11 +109,16 @@ private:
     bool verbose{};
 
     typedef struct psa_nanomsg_serializer_entry {
+        psa_nanomsg_serializer_entry(const char*_serType, long _svcId, pubsub_serializer_service_t *_svc) :
+            serType{_serType}, svcId{_svcId}, svc{_svc} {
+
+        }
+
         const char *serType;
         long svcId;
         pubsub_serializer_service_t *svc;
     } psa_nanomsg_serializer_entry_t;
-    ProtectedMap<long, psa_nanomsg_serializer_entry_t*> serializers{};
+    ProtectedMap<long, psa_nanomsg_serializer_entry_t> serializers{};
     ProtectedMap<char*, pubsub_nanomsg_topic_sender_t*> topicSenders{};
     ProtectedMap<std::string, pubsub::nanomsg::topic_receiver*> topicReceivers{};
     ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{};
@@ -131,4 +134,3 @@ extern "C" {
 
 
 #endif //CELIX_PUBSUB_ZMQ_ADMIN_H
-

http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index db8469b..9f77a4c 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -158,7 +158,7 @@ void pubsub::nanomsg::topic_receiver::listConnections(std::vector<std::string> &
 
 
 void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
-    L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", m_scope.c_str(), m_topic.c_str(), url);
+    L_DEBUG("[PSA_NANOMSG] TopicReceiver %s/%s connecting to nanomsg url %s", m_scope.c_str(), m_topic.c_str(), url);
 
     std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
     auto entry  = requestedConnections.map.find(url);
@@ -181,7 +181,7 @@ void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
 }
 
 void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) {
-    L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", m_scope.c_str(), m_topic.c_str(), url);
+    L_DEBUG("[PSA NANOMSG] TopicReceiver %s/%s disconnect from nanomsg url %s", m_scope.c_str(), m_topic.c_str(), url);
 
     std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
     auto entry = requestedConnections.map.find(url);
@@ -316,13 +316,13 @@ void pubsub::nanomsg::topic_receiver::recvThread_exec() {
             processMsg(&msg->header, msg->payload, recvBytes-sizeof(msg->header));
             nn_freemsg(msg);
         } else if (recvBytes >= 0) {
-            L_ERROR("[PSA_ZMQ_TR] Error receiving nanmosg msg, size (%d) smaller than header\n", recvBytes);
+            L_ERROR("[PSA_NANOMSG_TR] Error receiving nanmosg msg, size (%d) smaller than header\n", recvBytes);
         } else if (errno == EAGAIN || errno == ETIMEDOUT) {
             //nop
         } else if (errno == EINTR) {
-            L_DEBUG("[PSA_ZMQ_TR] zmsg_recv interrupted");
+            L_DEBUG("[PSA_NANOMSG_TR] nn_recvmsg interrupted");
         } else {
-            L_WARN("[PSA_ZMQ_TR] Error receiving zmq message: errno %d: %s\n", errno, strerror(errno));
+            L_WARN("[PSA_NANOMSG_TR] Error receiving nanomessage: errno %d: %s\n", errno, strerror(errno));
         }
     } // while
 

http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
index ff0d4f7..d5ed28f 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
@@ -37,9 +37,9 @@
 #include "pubsub_psa_nanomsg_constants.h"
 #include "pubsub_nanomsg_common.h"
 
-#define FIRST_SEND_DELAY_IN_SECONDS             2
+#define FIRST_SEND_DELAY_IN_SECONDS                 2
 #define NANOMSG_BIND_MAX_RETRY                      10
-/*
+
 #define L_DEBUG(...) \
     logHelper_log(sender->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
 #define L_INFO(...) \
@@ -48,11 +48,6 @@
     logHelper_log(sender->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
 #define L_ERROR(...) \
     logHelper_log(sender->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
-*/
-#define L_DEBUG printf
-#define L_INFO printf
-#define L_WARN printf
-#define L_ERROR printf
 
 struct pubsub_nanomsg_topic_sender {
     celix_bundle_context_t *ctx;
@@ -349,7 +344,7 @@ static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int msgTypeId
     return status;
 }
 
-static void delay_first_send_for_late_joiners(pubsub_nanomsg_topic_sender_t */*sender*/) {
+static void delay_first_send_for_late_joiners(pubsub_nanomsg_topic_sender_t *sender) {
 
     static bool firstSend = true;