You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2019/02/26 11:46:32 UTC

[celix] branch develop updated: CELIX-460: Adds support for configuring the thread prio/sched in psa zmq.

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/develop by this push:
     new 525a0ed  CELIX-460: Adds support for configuring the thread prio/sched in psa zmq.
525a0ed is described below

commit 525a0edba7d34f7e3e63d693c71a47e50e684fa1
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Tue Feb 26 12:45:40 2019 +0100

    CELIX-460: Adds support for configuring the thread prio/sched in psa zmq.
---
 .../src/pubsub_psa_zmq_constants.h                 |  13 +++
 .../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c |   4 +-
 .../src/pubsub_zmq_topic_receiver.c                | 130 ++++++++++++++-------
 .../src/pubsub_zmq_topic_receiver.h                |   2 +-
 libs/framework/include/constants.h                 |   1 -
 libs/framework/src/celix_library_loader.c          |  10 +-
 libs/framework/src/framework.c                     |   8 +-
 7 files changed, 114 insertions(+), 54 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
index ade3551..8f55d62 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
@@ -84,4 +84,17 @@
  */
 #define PUBSUB_ZMQ_STATIC_CONNECT_URLS    "zmq.static.connect.urls"
 
+/**
+ * Realtime thread prio and scheduling information. This is used to setup the thread prio/sched of the
+ * internal ZMQ threads.
+ */
+#define PUBSUB_ZMQ_THREAD_REALTIME_PRIO    "thread.realtime.prio"
+#define PUBSUB_ZMQ_THREAD_REALTIME_SHED    "thread.realtime.shed"
+
+/**
+ * High Water Mark option. See ZMQ doc for more information
+ * Note expected type is long
+ */
+#define PUBSUB_ZMQ_HWM                      "zmq.hwm"
+
 #endif /* PUBSUB_PSA_ZMQ_CONSTANTS_H_ */
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index ce83c32..a61e533 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@ -447,8 +447,6 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop
 
     celix_properties_t *newEndpoint = NULL;
 
-    const char *staticConnectUrls = celix_properties_get(topicProperties, PUBSUB_ZMQ_STATIC_CONNECT_URLS, NULL);
-
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
     celixThreadMutex_lock(&psa->serializers.mutex);
     celixThreadMutex_lock(&psa->topicReceivers.mutex);
@@ -456,7 +454,7 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop
     if (receiver == NULL) {
         psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
         if (serEntry != NULL) {
-            receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, staticConnectUrls, serializerSvcId, serEntry->svc);
+            receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, serializerSvcId, serEntry->svc);
         } else {
             L_ERROR("[PSA_ZMQ] Cannot find serializer for TopicSender %s/%s", scope, topic);
         }
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 49e8aa5..c5c3530 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -61,7 +61,8 @@ struct pubsub_zmq_topic_receiver {
     char scopeAndTopicFilter[5];
     bool metricsEnabled;
 
-    zsock_t *zmqSocket;
+    void *zmqCtx;
+    void *zmqSock;
 
     struct {
         celix_thread_t thread;
@@ -119,6 +120,8 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
 static void* psa_zmq_recvThread(void * data);
 static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t *receiver);
 static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiver);
+static void psa_zmq_setupZmqContext(pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *topicProperties);
+static void psa_zmq_setupZmqSocket(pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *topicProperties);
 
 
 
@@ -126,7 +129,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
                                                               log_helper_t *logHelper,
                                                               const char *scope,
                                                               const char *topic,
-                                                              const char *staticConnectUrls,
+                                                              const celix_properties_t *topicProperties,
                                                               long serializerSvcId,
                                                               pubsub_serializer_service_t *serializer) {
     pubsub_zmq_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
@@ -134,6 +137,8 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
     receiver->logHelper = logHelper;
     receiver->serializerSvcId = serializerSvcId;
     receiver->serializer = serializer;
+    receiver->scope = strndup(scope, 1024 * 1024);
+    receiver->topic = strndup(topic, 1024 * 1024);
     psa_zmq_setScopeAndTopicFilter(scope, topic, receiver->scopeAndTopicFilter);
     receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
 
@@ -175,40 +180,28 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
 
     const char* pub_key = zcert_public_txt(pub_cert);
 #endif
-    receiver->zmqSocket = zsock_new (ZMQ_SUB);
-    if (receiver->zmqSocket == NULL) {
-#ifdef BUILD_WITH_ZMQ_SECURITY
-        zcert_destroy(&sub_cert);
-        zcert_destroy(&pub_cert);
-#endif
+    receiver->zmqCtx = zmq_ctx_new();
+    if (receiver->zmqCtx != NULL) {
+        psa_zmq_setupZmqContext(receiver, topicProperties);
+        receiver->zmqSock = zmq_socket(receiver->zmqCtx, ZMQ_SUB);
+    } else {
+        //LOG ctx problem
+    }
+    if (receiver->zmqSock != NULL) {
+        psa_zmq_setupZmqSocket(receiver, topicProperties);
+    } else if (receiver->zmqCtx != NULL) {
+        //LOG sock problem
     }
 
-    if (receiver->zmqSocket != NULL) {
-        int timeout = PSA_ZMQ_RECV_TIMEOUT;
-        void *zmqSocket =  zsock_resolve(receiver->zmqSocket);
-        int res = zmq_setsockopt(zmqSocket, ZMQ_RCVTIMEO, &timeout, sizeof(timeout));
-        if (res) {
-            L_ERROR("[PSA_ZMQ] Cannot set ZMQ socket option ZMQ_RCVTIMEO errno=%d", errno);
-        }
-#ifdef BUILD_WITH_ZMQ_SECURITY
-
-        zcert_apply (sub_cert, zmq_s);
-        zsock_set_curve_serverkey (zmq_s, pub_key); //apply key of publisher to socket of subscriber
-#endif
-        char subscribeFilter[5];
-        psa_zmq_setScopeAndTopicFilter(scope, topic, subscribeFilter);
-        zsock_set_subscribe(receiver->zmqSocket, subscribeFilter);
-
+    if (receiver->zmqSock == NULL) {
 #ifdef BUILD_WITH_ZMQ_SECURITY
-        ts->zmq_cert = sub_cert;
-	    ts->zmq_pub_cert = pub_cert;
+        zcert_destroy(&sub_cert);
+        zcert_destroy(&pub_cert);
 #endif
     }
 
-    if (receiver->zmqSocket != NULL) {
-        receiver->scope = strndup(scope, 1024 * 1024);
-        receiver->topic = strndup(topic, 1024 * 1024);
 
+    if (receiver->zmqSock != NULL) {
         celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
         celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
         celixThreadMutex_create(&receiver->recvThread.mutex, NULL);
@@ -217,7 +210,8 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
         receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
     }
 
-    if (receiver->zmqSocket != NULL && staticConnectUrls != NULL) {
+    const char *staticConnectUrls = celix_properties_get(topicProperties, PUBSUB_ZMQ_STATIC_CONNECT_URLS, NULL);
+    if (receiver->zmqSock != NULL && staticConnectUrls != NULL) {
         char *urlsCopy = strndup(staticConnectUrls, 1024*1024);
         char* url;
         char* save = urlsCopy;
@@ -234,7 +228,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
     }
 
     //track subscribers
-    if (receiver->zmqSocket != NULL ) {
+    if (receiver->zmqSock != NULL ) {
         int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
         char buf[size+1];
         snprintf(buf, (size_t)size+1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
@@ -249,7 +243,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
         receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
     }
 
-    if (receiver->zmqSocket != NULL ) {
+    if (receiver->zmqSock != NULL ) {
         receiver->recvThread.running = true;
         celixThread_create(&receiver->recvThread.thread, NULL, psa_zmq_recvThread, receiver);
         char name[64];
@@ -257,7 +251,9 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
         celixThread_setName(&receiver->recvThread.thread, name);
     }
 
-    if (receiver->zmqSocket == NULL) {
+    if (receiver->zmqSock == NULL) {
+        free(receiver->scope);
+        free(receiver->topic);
         free(receiver);
         receiver = NULL;
         L_ERROR("[PSA_ZMQ] Cannot create TopicReceiver for %s/%s", scope, topic);
@@ -313,7 +309,8 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) {
         celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
         celixThreadMutex_destroy(&receiver->recvThread.mutex);
 
-        zsock_destroy(&receiver->zmqSocket);
+        zmq_close(receiver->zmqSock);
+        zmq_ctx_term(receiver->zmqCtx);
 
         free(receiver->scope);
         free(receiver->topic);
@@ -375,7 +372,7 @@ void pubsub_zmqTopicReceiver_disconnectFrom(pubsub_zmq_topic_receiver_t *receive
     celixThreadMutex_lock(&receiver->requestedConnections.mutex);
     psa_zmq_requested_connection_entry_t *entry = hashMap_remove(receiver->requestedConnections.map, url);
     if (entry != NULL && entry->connected) {
-        if (zsock_disconnect(receiver->zmqSocket,"%s",url) == 0) {
+        if (zmq_disconnect(receiver->zmqSock, url) == 0) {
             entry->connected = false;
         } else {
             L_WARN("[PSA_ZMQ] Error disconnecting from zmq url %s. (%s)", url, strerror(errno));
@@ -511,7 +508,7 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
             metrics->msgTypeId = hdr->type;
             metrics->maxDelayInSeconds = -INFINITY;
             metrics->minDelayInSeconds = INFINITY;
-            metrics->lastSeqNr = -1;
+            metrics->lastSeqNr = 0;
         }
 
         double diff = celix_difftime(&beginSer, &endSer);
@@ -527,10 +524,11 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
 
 
         int incr = hdr->seqNr - metrics->lastSeqNr;
-        if (incr > 1) {
+        if (metrics->lastSeqNr >0 && incr > 1) {
             metrics->nrOfMissingSeqNumbers += (incr - 1);
             L_WARN("Missing message seq nr went from %i to %i", metrics->lastSeqNr, hdr->seqNr);
         }
+        metrics->lastSeqNr = hdr->seqNr;
 
         struct timespec sendTime;
         sendTime.tv_sec = (time_t)hdr->sendtimeSeconds;
@@ -585,7 +583,7 @@ static void* psa_zmq_recvThread(void * data) {
             psa_zmq_initializeAllSubscribers(receiver);
         }
 
-        zmsg_t *zmsg = zmsg_recv(receiver->zmqSocket);
+        zmsg_t *zmsg = zmsg_recv(receiver->zmqSock);
         if (zmsg != NULL) {
             if (zmsg_size(zmsg) != 3) {
                 L_WARN("[PSA_ZMQ_TR] Always expecting 2 frames per zmsg (header + payload), got %i frames", (int)zmsg_size(zmsg));
@@ -650,7 +648,7 @@ pubsub_admin_receiver_metrics_t* pubsub_zmqTopicReceiver_metrics(pubsub_zmq_topi
         }
     }
 
-    result->nrOfMsgTypes = msgTypesCount;
+    result->nrOfMsgTypes = (unsigned long)msgTypesCount;
     result->msgTypes = calloc(msgTypesCount, sizeof(*result->msgTypes));
     int i = 0;
     iter = hashMapIterator_construct(receiver->subscribers.map);
@@ -703,7 +701,7 @@ static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t
         while (hashMapIterator_hasNext(&iter)) {
             psa_zmq_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (!entry->connected){
-                if (zsock_connect(receiver->zmqSocket, "%s", entry->url) == 0) {
+                if (zmq_connect(receiver->zmqSock, entry->url) == 0) {
                     entry->connected = true;
                 } else {
                     L_WARN("[PSA_ZMQ] Error connecting to zmq url %s. (%s)", entry->url, strerror(errno));
@@ -740,3 +738,55 @@ static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiv
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
+
+static void psa_zmq_setupZmqContext(pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *topicProperties) {
+    long prio = celix_properties_getAsLong(topicProperties, PUBSUB_ZMQ_THREAD_REALTIME_PRIO, -1L);
+    if (prio > 0 && prio < 100) {
+        zmq_ctx_set(receiver->zmqCtx, ZMQ_THREAD_PRIORITY, (int)prio);
+    }
+
+    const char *shed = celix_properties_get(topicProperties, PUBSUB_ZMQ_THREAD_REALTIME_SHED, NULL);
+    if (shed != NULL) {
+        int policy = ZMQ_THREAD_SCHED_POLICY_DFLT;
+        if (strncmp("SCHED_OTHER", shed, 16) == 0) {
+            policy = SCHED_OTHER;
+        } else if (strncmp("SCHED_BATCH", shed, 16) == 0) {
+            policy = SCHED_BATCH;
+        } else if (strncmp("SCHED_IDLE", shed, 16) == 0) {
+            policy = SCHED_IDLE;
+        } else if (strncmp("SCHED_FIFO", shed, 16) == 0) {
+            policy = SCHED_FIFO;
+        } else if (strncmp("SCHED_RR", shed, 16) == 0) {
+            policy = SCHED_RR;
+        }
+        zmq_ctx_set(receiver->zmqCtx, ZMQ_THREAD_SCHED_POLICY, policy);
+    }
+}
+
+static void psa_zmq_setupZmqSocket(pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *topicProperties) {
+    int timeout = PSA_ZMQ_RECV_TIMEOUT;
+    int res = zmq_setsockopt(receiver->zmqSock, ZMQ_RCVTIMEO, &timeout, sizeof(timeout));
+    if (res) {
+        L_ERROR("[PSA_ZMQ] Cannot set ZMQ socket option ZMQ_RCVTIMEO errno=%d", errno);
+    }
+
+#ifdef ZMQ_HWM
+    long hwmProp = celix_properties_getAsLong(topicProperties, PUBSUB_ZMQ_HWM, -1L);
+    if (hwmProp >= 0) {
+        unsigned long hwm = (unsigned long)hwmProp;
+        zmq_setsockopt(receiver->zmqSock, ZMQ_HWM, &hwm, sizeof(hwm));
+    }
+#endif
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+
+    zcert_apply (sub_cert, zmq_s);
+    zsock_set_curve_serverkey (zmq_s, pub_key); //apply key of publisher to socket of subscriber
+#endif
+    zsock_set_subscribe(receiver->zmqSock, receiver->scopeAndTopicFilter);
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+    ts->zmq_cert = sub_cert;
+    ts->zmq_pub_cert = pub_cert;
+#endif
+}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
index d40b19d..a2d883f 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
@@ -28,7 +28,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
         log_helper_t *logHelper,
         const char *scope, 
         const char *topic,
-        const char *staticConnectUrls,
+        const celix_properties_t *topicProperties,
         long serializerSvcId,
         pubsub_serializer_service_t *serializer);
 void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver);
diff --git a/libs/framework/include/constants.h b/libs/framework/include/constants.h
index 98cfcbf..a228665 100644
--- a/libs/framework/include/constants.h
+++ b/libs/framework/include/constants.h
@@ -78,7 +78,6 @@ static const char *const CELIX_BUNDLES_PATH_NAME = "CELIX_BUNDLES_PATH";
 static const char *const CELIX_BUNDLES_PATH_DEFAULT = "bundles";
 
 static const char *const CELIX_LOAD_BUNDLES_WITH_NODELETE = "CELIX_LOAD_BUNDLES_WITH_NODELETE";
-static const bool CELIX_LOAD_BUNDLES_WITH_NODELETE_DEFAULT = false;
 
 #define CELIX_AUTO_START_0 "CELIX_AUTO_START_0"
 #define CELIX_AUTO_START_1 "CELIX_AUTO_START_1"
diff --git a/libs/framework/src/celix_library_loader.c b/libs/framework/src/celix_library_loader.c
index 95264f5..dc7c75f 100644
--- a/libs/framework/src/celix_library_loader.c
+++ b/libs/framework/src/celix_library_loader.c
@@ -22,17 +22,17 @@
 #include <dlfcn.h>
 
 celix_library_handle_t* celix_libloader_open(celix_bundle_context_t *ctx, const char *libPath) {
-    bool noDelete;
 #if defined(DEBUG) && !defined(ANDROID)
-    ctx = ctx; //use arg
-    noDelete = true;
+    bool def = true;
 #else
-    bool noDelete = celix_bundleContext_getPropertyAsBool(ctx, CELIX_LOAD_BUNDLES_WITH_NODELETE, CELIX_LOAD_BUNDLES_WITH_NODELETE_DEFAULT);
+    bool def = false;
 #endif
+    bool noDelete = celix_bundleContext_getPropertyAsBool(ctx, CELIX_LOAD_BUNDLES_WITH_NODELETE, def);
     if (noDelete) {
         return dlopen(libPath, RTLD_LAZY|RTLD_LOCAL|RTLD_NODELETE);
+    } else {
+        return dlopen(libPath, RTLD_LAZY|RTLD_LOCAL);
     }
-    return dlopen(libPath, RTLD_LAZY|RTLD_LOCAL);
 }
 
 
diff --git a/libs/framework/src/framework.c b/libs/framework/src/framework.c
index 2e1c210..1e61138 100644
--- a/libs/framework/src/framework.c
+++ b/libs/framework/src/framework.c
@@ -1042,7 +1042,7 @@ celix_status_t framework_updateBundle(framework_pt framework, bundle_pt bundle,
         int i;
 	    for (i = arrayList_size(handles) - 1; i >= 0; i--) {
 	        void* handle = arrayList_get(handles, i);
-	        fw_closeLibrary(handle);
+	        celix_libloader_close(handle);
 	    }
     }
 
@@ -1237,8 +1237,8 @@ celix_status_t fw_uninstallBundle(framework_pt framework, bundle_pt bundle) {
 	status = CELIX_DO_IF(status, bundleRevision_getHandles(revision, &handles));
 	if(handles != NULL){
 		for (int i = arrayList_size(handles) - 1; i >= 0; i--) {
-			void *handle = arrayList_get(handles, i);
-			fw_closeLibrary(handle);
+			celix_library_handle_t *handle = arrayList_get(handles, i);
+			celix_libloader_close(handle);
 		}
 	}
 
@@ -2541,7 +2541,7 @@ static celix_status_t framework_loadLibraries(framework_pt framework, const char
             *activatorHandle = handle;
         }
         else if(handle!=NULL){
-            fw_closeLibrary(handle);
+            celix_libloader_close(handle);
         }
 
         token = strtok_r(NULL, ",", &last);