You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2019/02/26 11:46:32 UTC
[celix] branch develop updated: CELIX-460: Adds support for
configuring the thread prio/sched in psa zmq.
This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/develop by this push:
new 525a0ed CELIX-460: Adds support for configuring the thread prio/sched in psa zmq.
525a0ed is described below
commit 525a0edba7d34f7e3e63d693c71a47e50e684fa1
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Tue Feb 26 12:45:40 2019 +0100
CELIX-460: Adds support for configuring the thread prio/sched in psa zmq.
---
.../src/pubsub_psa_zmq_constants.h | 13 +++
.../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c | 4 +-
.../src/pubsub_zmq_topic_receiver.c | 130 ++++++++++++++-------
.../src/pubsub_zmq_topic_receiver.h | 2 +-
libs/framework/include/constants.h | 1 -
libs/framework/src/celix_library_loader.c | 10 +-
libs/framework/src/framework.c | 8 +-
7 files changed, 114 insertions(+), 54 deletions(-)
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
index ade3551..8f55d62 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
@@ -84,4 +84,17 @@
*/
#define PUBSUB_ZMQ_STATIC_CONNECT_URLS "zmq.static.connect.urls"
+/**
+ * Realtime thread prio and scheduling information. This is used to setup the thread prio/sched of the
+ * internal ZMQ threads.
+ */
+#define PUBSUB_ZMQ_THREAD_REALTIME_PRIO "thread.realtime.prio"
+#define PUBSUB_ZMQ_THREAD_REALTIME_SHED "thread.realtime.shed"
+
+/**
+ * High Water Mark option. See ZMQ doc for more information
+ * Note expected type is long
+ */
+#define PUBSUB_ZMQ_HWM "zmq.hwm"
+
#endif /* PUBSUB_PSA_ZMQ_CONSTANTS_H_ */
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index ce83c32..a61e533 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@ -447,8 +447,6 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop
celix_properties_t *newEndpoint = NULL;
- const char *staticConnectUrls = celix_properties_get(topicProperties, PUBSUB_ZMQ_STATIC_CONNECT_URLS, NULL);
-
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
celixThreadMutex_lock(&psa->serializers.mutex);
celixThreadMutex_lock(&psa->topicReceivers.mutex);
@@ -456,7 +454,7 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop
if (receiver == NULL) {
psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
if (serEntry != NULL) {
- receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, staticConnectUrls, serializerSvcId, serEntry->svc);
+ receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, serializerSvcId, serEntry->svc);
} else {
L_ERROR("[PSA_ZMQ] Cannot find serializer for TopicSender %s/%s", scope, topic);
}
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 49e8aa5..c5c3530 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -61,7 +61,8 @@ struct pubsub_zmq_topic_receiver {
char scopeAndTopicFilter[5];
bool metricsEnabled;
- zsock_t *zmqSocket;
+ void *zmqCtx;
+ void *zmqSock;
struct {
celix_thread_t thread;
@@ -119,6 +120,8 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
static void* psa_zmq_recvThread(void * data);
static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t *receiver);
static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiver);
+static void psa_zmq_setupZmqContext(pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *topicProperties);
+static void psa_zmq_setupZmqSocket(pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *topicProperties);
@@ -126,7 +129,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
log_helper_t *logHelper,
const char *scope,
const char *topic,
- const char *staticConnectUrls,
+ const celix_properties_t *topicProperties,
long serializerSvcId,
pubsub_serializer_service_t *serializer) {
pubsub_zmq_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
@@ -134,6 +137,8 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
receiver->logHelper = logHelper;
receiver->serializerSvcId = serializerSvcId;
receiver->serializer = serializer;
+ receiver->scope = strndup(scope, 1024 * 1024);
+ receiver->topic = strndup(topic, 1024 * 1024);
psa_zmq_setScopeAndTopicFilter(scope, topic, receiver->scopeAndTopicFilter);
receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
@@ -175,40 +180,28 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
const char* pub_key = zcert_public_txt(pub_cert);
#endif
- receiver->zmqSocket = zsock_new (ZMQ_SUB);
- if (receiver->zmqSocket == NULL) {
-#ifdef BUILD_WITH_ZMQ_SECURITY
- zcert_destroy(&sub_cert);
- zcert_destroy(&pub_cert);
-#endif
+ receiver->zmqCtx = zmq_ctx_new();
+ if (receiver->zmqCtx != NULL) {
+ psa_zmq_setupZmqContext(receiver, topicProperties);
+ receiver->zmqSock = zmq_socket(receiver->zmqCtx, ZMQ_SUB);
+ } else {
+ //LOG ctx problem
+ }
+ if (receiver->zmqSock != NULL) {
+ psa_zmq_setupZmqSocket(receiver, topicProperties);
+ } else if (receiver->zmqCtx != NULL) {
+ //LOG sock problem
}
- if (receiver->zmqSocket != NULL) {
- int timeout = PSA_ZMQ_RECV_TIMEOUT;
- void *zmqSocket = zsock_resolve(receiver->zmqSocket);
- int res = zmq_setsockopt(zmqSocket, ZMQ_RCVTIMEO, &timeout, sizeof(timeout));
- if (res) {
- L_ERROR("[PSA_ZMQ] Cannot set ZMQ socket option ZMQ_RCVTIMEO errno=%d", errno);
- }
-#ifdef BUILD_WITH_ZMQ_SECURITY
-
- zcert_apply (sub_cert, zmq_s);
- zsock_set_curve_serverkey (zmq_s, pub_key); //apply key of publisher to socket of subscriber
-#endif
- char subscribeFilter[5];
- psa_zmq_setScopeAndTopicFilter(scope, topic, subscribeFilter);
- zsock_set_subscribe(receiver->zmqSocket, subscribeFilter);
-
+ if (receiver->zmqSock == NULL) {
#ifdef BUILD_WITH_ZMQ_SECURITY
- ts->zmq_cert = sub_cert;
- ts->zmq_pub_cert = pub_cert;
+ zcert_destroy(&sub_cert);
+ zcert_destroy(&pub_cert);
#endif
}
- if (receiver->zmqSocket != NULL) {
- receiver->scope = strndup(scope, 1024 * 1024);
- receiver->topic = strndup(topic, 1024 * 1024);
+ if (receiver->zmqSock != NULL) {
celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
celixThreadMutex_create(&receiver->recvThread.mutex, NULL);
@@ -217,7 +210,8 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
}
- if (receiver->zmqSocket != NULL && staticConnectUrls != NULL) {
+ const char *staticConnectUrls = celix_properties_get(topicProperties, PUBSUB_ZMQ_STATIC_CONNECT_URLS, NULL);
+ if (receiver->zmqSock != NULL && staticConnectUrls != NULL) {
char *urlsCopy = strndup(staticConnectUrls, 1024*1024);
char* url;
char* save = urlsCopy;
@@ -234,7 +228,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
}
//track subscribers
- if (receiver->zmqSocket != NULL ) {
+ if (receiver->zmqSock != NULL ) {
int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
char buf[size+1];
snprintf(buf, (size_t)size+1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
@@ -249,7 +243,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
}
- if (receiver->zmqSocket != NULL ) {
+ if (receiver->zmqSock != NULL ) {
receiver->recvThread.running = true;
celixThread_create(&receiver->recvThread.thread, NULL, psa_zmq_recvThread, receiver);
char name[64];
@@ -257,7 +251,9 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
celixThread_setName(&receiver->recvThread.thread, name);
}
- if (receiver->zmqSocket == NULL) {
+ if (receiver->zmqSock == NULL) {
+ free(receiver->scope);
+ free(receiver->topic);
free(receiver);
receiver = NULL;
L_ERROR("[PSA_ZMQ] Cannot create TopicReceiver for %s/%s", scope, topic);
@@ -313,7 +309,8 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) {
celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
celixThreadMutex_destroy(&receiver->recvThread.mutex);
- zsock_destroy(&receiver->zmqSocket);
+ zmq_close(receiver->zmqSock);
+ zmq_ctx_term(receiver->zmqCtx);
free(receiver->scope);
free(receiver->topic);
@@ -375,7 +372,7 @@ void pubsub_zmqTopicReceiver_disconnectFrom(pubsub_zmq_topic_receiver_t *receive
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
psa_zmq_requested_connection_entry_t *entry = hashMap_remove(receiver->requestedConnections.map, url);
if (entry != NULL && entry->connected) {
- if (zsock_disconnect(receiver->zmqSocket,"%s",url) == 0) {
+ if (zmq_disconnect(receiver->zmqSock, url) == 0) {
entry->connected = false;
} else {
L_WARN("[PSA_ZMQ] Error disconnecting from zmq url %s. (%s)", url, strerror(errno));
@@ -511,7 +508,7 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
metrics->msgTypeId = hdr->type;
metrics->maxDelayInSeconds = -INFINITY;
metrics->minDelayInSeconds = INFINITY;
- metrics->lastSeqNr = -1;
+ metrics->lastSeqNr = 0;
}
double diff = celix_difftime(&beginSer, &endSer);
@@ -527,10 +524,11 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
int incr = hdr->seqNr - metrics->lastSeqNr;
- if (incr > 1) {
+ if (metrics->lastSeqNr >0 && incr > 1) {
metrics->nrOfMissingSeqNumbers += (incr - 1);
L_WARN("Missing message seq nr went from %i to %i", metrics->lastSeqNr, hdr->seqNr);
}
+ metrics->lastSeqNr = hdr->seqNr;
struct timespec sendTime;
sendTime.tv_sec = (time_t)hdr->sendtimeSeconds;
@@ -585,7 +583,7 @@ static void* psa_zmq_recvThread(void * data) {
psa_zmq_initializeAllSubscribers(receiver);
}
- zmsg_t *zmsg = zmsg_recv(receiver->zmqSocket);
+ zmsg_t *zmsg = zmsg_recv(receiver->zmqSock);
if (zmsg != NULL) {
if (zmsg_size(zmsg) != 3) {
L_WARN("[PSA_ZMQ_TR] Always expecting 2 frames per zmsg (header + payload), got %i frames", (int)zmsg_size(zmsg));
@@ -650,7 +648,7 @@ pubsub_admin_receiver_metrics_t* pubsub_zmqTopicReceiver_metrics(pubsub_zmq_topi
}
}
- result->nrOfMsgTypes = msgTypesCount;
+ result->nrOfMsgTypes = (unsigned long)msgTypesCount;
result->msgTypes = calloc(msgTypesCount, sizeof(*result->msgTypes));
int i = 0;
iter = hashMapIterator_construct(receiver->subscribers.map);
@@ -703,7 +701,7 @@ static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t
while (hashMapIterator_hasNext(&iter)) {
psa_zmq_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
if (!entry->connected){
- if (zsock_connect(receiver->zmqSocket, "%s", entry->url) == 0) {
+ if (zmq_connect(receiver->zmqSock, entry->url) == 0) {
entry->connected = true;
} else {
L_WARN("[PSA_ZMQ] Error connecting to zmq url %s. (%s)", entry->url, strerror(errno));
@@ -740,3 +738,55 @@ static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiv
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
+
+static void psa_zmq_setupZmqContext(pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *topicProperties) {
+ long prio = celix_properties_getAsLong(topicProperties, PUBSUB_ZMQ_THREAD_REALTIME_PRIO, -1L);
+ if (prio > 0 && prio < 100) {
+ zmq_ctx_set(receiver->zmqCtx, ZMQ_THREAD_PRIORITY, (int)prio);
+ }
+
+ const char *shed = celix_properties_get(topicProperties, PUBSUB_ZMQ_THREAD_REALTIME_SHED, NULL);
+ if (shed != NULL) {
+ int policy = ZMQ_THREAD_SCHED_POLICY_DFLT;
+ if (strncmp("SCHED_OTHER", shed, 16) == 0) {
+ policy = SCHED_OTHER;
+ } else if (strncmp("SCHED_BATCH", shed, 16) == 0) {
+ policy = SCHED_BATCH;
+ } else if (strncmp("SCHED_IDLE", shed, 16) == 0) {
+ policy = SCHED_IDLE;
+ } else if (strncmp("SCHED_FIFO", shed, 16) == 0) {
+ policy = SCHED_FIFO;
+ } else if (strncmp("SCHED_RR", shed, 16) == 0) {
+ policy = SCHED_RR;
+ }
+ zmq_ctx_set(receiver->zmqCtx, ZMQ_THREAD_SCHED_POLICY, policy);
+ }
+}
+
+static void psa_zmq_setupZmqSocket(pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *topicProperties) {
+ int timeout = PSA_ZMQ_RECV_TIMEOUT;
+ int res = zmq_setsockopt(receiver->zmqSock, ZMQ_RCVTIMEO, &timeout, sizeof(timeout));
+ if (res) {
+ L_ERROR("[PSA_ZMQ] Cannot set ZMQ socket option ZMQ_RCVTIMEO errno=%d", errno);
+ }
+
+#ifdef ZMQ_HWM
+ long hwmProp = celix_properties_getAsLong(topicProperties, PUBSUB_ZMQ_HWM, -1L);
+ if (hwmProp >= 0) {
+ unsigned long hwm = (unsigned long)hwmProp;
+ zmq_setsockopt(receiver->zmqSock, ZMQ_HWM, &hwm, sizeof(hwm));
+ }
+#endif
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+
+ zcert_apply (sub_cert, zmq_s);
+ zsock_set_curve_serverkey (zmq_s, pub_key); //apply key of publisher to socket of subscriber
+#endif
+ zsock_set_subscribe(receiver->zmqSock, receiver->scopeAndTopicFilter);
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+ ts->zmq_cert = sub_cert;
+ ts->zmq_pub_cert = pub_cert;
+#endif
+}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
index d40b19d..a2d883f 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
@@ -28,7 +28,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
log_helper_t *logHelper,
const char *scope,
const char *topic,
- const char *staticConnectUrls,
+ const celix_properties_t *topicProperties,
long serializerSvcId,
pubsub_serializer_service_t *serializer);
void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver);
diff --git a/libs/framework/include/constants.h b/libs/framework/include/constants.h
index 98cfcbf..a228665 100644
--- a/libs/framework/include/constants.h
+++ b/libs/framework/include/constants.h
@@ -78,7 +78,6 @@ static const char *const CELIX_BUNDLES_PATH_NAME = "CELIX_BUNDLES_PATH";
static const char *const CELIX_BUNDLES_PATH_DEFAULT = "bundles";
static const char *const CELIX_LOAD_BUNDLES_WITH_NODELETE = "CELIX_LOAD_BUNDLES_WITH_NODELETE";
-static const bool CELIX_LOAD_BUNDLES_WITH_NODELETE_DEFAULT = false;
#define CELIX_AUTO_START_0 "CELIX_AUTO_START_0"
#define CELIX_AUTO_START_1 "CELIX_AUTO_START_1"
diff --git a/libs/framework/src/celix_library_loader.c b/libs/framework/src/celix_library_loader.c
index 95264f5..dc7c75f 100644
--- a/libs/framework/src/celix_library_loader.c
+++ b/libs/framework/src/celix_library_loader.c
@@ -22,17 +22,17 @@
#include <dlfcn.h>
celix_library_handle_t* celix_libloader_open(celix_bundle_context_t *ctx, const char *libPath) {
- bool noDelete;
#if defined(DEBUG) && !defined(ANDROID)
- ctx = ctx; //use arg
- noDelete = true;
+ bool def = true;
#else
- bool noDelete = celix_bundleContext_getPropertyAsBool(ctx, CELIX_LOAD_BUNDLES_WITH_NODELETE, CELIX_LOAD_BUNDLES_WITH_NODELETE_DEFAULT);
+ bool def = false;
#endif
+ bool noDelete = celix_bundleContext_getPropertyAsBool(ctx, CELIX_LOAD_BUNDLES_WITH_NODELETE, def);
if (noDelete) {
return dlopen(libPath, RTLD_LAZY|RTLD_LOCAL|RTLD_NODELETE);
+ } else {
+ return dlopen(libPath, RTLD_LAZY|RTLD_LOCAL);
}
- return dlopen(libPath, RTLD_LAZY|RTLD_LOCAL);
}
diff --git a/libs/framework/src/framework.c b/libs/framework/src/framework.c
index 2e1c210..1e61138 100644
--- a/libs/framework/src/framework.c
+++ b/libs/framework/src/framework.c
@@ -1042,7 +1042,7 @@ celix_status_t framework_updateBundle(framework_pt framework, bundle_pt bundle,
int i;
for (i = arrayList_size(handles) - 1; i >= 0; i--) {
void* handle = arrayList_get(handles, i);
- fw_closeLibrary(handle);
+ celix_libloader_close(handle);
}
}
@@ -1237,8 +1237,8 @@ celix_status_t fw_uninstallBundle(framework_pt framework, bundle_pt bundle) {
status = CELIX_DO_IF(status, bundleRevision_getHandles(revision, &handles));
if(handles != NULL){
for (int i = arrayList_size(handles) - 1; i >= 0; i--) {
- void *handle = arrayList_get(handles, i);
- fw_closeLibrary(handle);
+ celix_library_handle_t *handle = arrayList_get(handles, i);
+ celix_libloader_close(handle);
}
}
@@ -2541,7 +2541,7 @@ static celix_status_t framework_loadLibraries(framework_pt framework, const char
*activatorHandle = handle;
}
else if(handle!=NULL){
- fw_closeLibrary(handle);
+ celix_libloader_close(handle);
}
token = strtok_r(NULL, ",", &last);