You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by er...@apache.org on 2018/11/01 19:43:20 UTC
celix git commit: Updates to nanomsg admin
Repository: celix
Updated Branches:
refs/heads/nanomsg 4fc1f3d3f -> cb740b0d4
Updates to nanomsg admin
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/cb740b0d
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/cb740b0d
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/cb740b0d
Branch: refs/heads/nanomsg
Commit: cb740b0d41df786fead29ab1d9f3930187a7fe82
Parents: 4fc1f3d
Author: Erjan Altena <er...@gmail.com>
Authored: Thu Nov 1 20:43:05 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Thu Nov 1 20:43:05 2018 +0100
----------------------------------------------------------------------
.../pubsub_admin_nanomsg/src/nanomsg_crypto.cc | 281 ---------
.../pubsub_admin_nanomsg/src/nanomsg_crypto.h | 41 --
.../src/psa_nanomsg_activator.cc | 171 +++---
.../src/pubsub_nanomsg_admin.cc | 573 ++++++++++---------
.../src/pubsub_nanomsg_admin.h | 110 +++-
.../src/pubsub_nanomsg_topic_receiver.cc | 21 +-
.../src/pubsub_nanomsg_topic_sender.cc | 2 -
libs/framework/include/celix_bundle_activator.h | 2 +-
8 files changed, 476 insertions(+), 725 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/cb740b0d/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.cc
deleted file mode 100644
index d7d88bb..0000000
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.cc
+++ /dev/null
@@ -1,281 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * zmq_crypto.c
- *
- * \date Dec 2, 2016
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#include "nanomsg_crypto.h"
-
-#include <zmq.h>
-#include <openssl/conf.h>
-#include <openssl/evp.h>
-#include <openssl/err.h>
-
-#include <string.h>
-
-#define MAX_FILE_PATH_LENGTH 512
-#define ZMQ_KEY_LENGTH 40
-#define AES_KEY_LENGTH 32
-#define AES_IV_LENGTH 16
-
-#define KEY_TO_GET "aes_key"
-#define IV_TO_GET "aes_iv"
-
-static char* read_file_content(char* filePath, char* fileName);
-static void parse_key_lines(char *keysBuffer, char **key, char **iv);
-static void parse_key_line(char *line, char **key, char **iv);
-static void extract_keys_from_buffer(unsigned char *input, int inputlen, char **publicKey, char **secretKey);
-
-/**
- * Return a valid zcert_t from an encoded file
- * Caller is responsible for freeing by calling zcert_destroy(zcert** cert);
- */
-zcert_t* get_zcert_from_encoded_file(char* keysFilePath, char* keysFileName, char* file_path)
-{
-
- if (keysFilePath == NULL){
- keysFilePath = DEFAULT_KEYS_FILE_PATH;
- }
-
- if (keysFileName == NULL){
- keysFileName = DEFAULT_KEYS_FILE_NAME;
- }
-
- char* keys_data = read_file_content(keysFilePath, keysFileName);
- if (keys_data == NULL){
- return NULL;
- }
-
- char *key = NULL;
- char *iv = NULL;
- parse_key_lines(keys_data, &key, &iv);
- free(keys_data);
-
- if (key == NULL || iv == NULL){
- free(key);
- free(iv);
-
- printf("CRYPTO: Loading AES key and/or AES iv failed!\n");
- return NULL;
- }
-
- //At this point, we know an aes key and iv are stored and loaded
-
- // generate sha256 hashes
- unsigned char key_digest[EVP_MAX_MD_SIZE];
- unsigned char iv_digest[EVP_MAX_MD_SIZE];
- generate_sha256_hash((char*) key, key_digest);
- generate_sha256_hash((char*) iv, iv_digest);
-
- zchunk_t* encoded_secret = zchunk_slurp (file_path, 0);
- if (encoded_secret == NULL){
- free(key);
- free(iv);
-
- return NULL;
- }
-
- int encoded_secret_size = (int) zchunk_size (encoded_secret);
- char* encoded_secret_data = zchunk_strdup(encoded_secret);
- zchunk_destroy (&encoded_secret);
-
- // Decryption of data
- int decryptedtext_len;
- unsigned char decryptedtext[encoded_secret_size];
- decryptedtext_len = decrypt((unsigned char *) encoded_secret_data, encoded_secret_size, key_digest, iv_digest, decryptedtext);
- decryptedtext[decryptedtext_len] = '\0';
-
- EVP_cleanup();
-
- free(encoded_secret_data);
- free(key);
- free(iv);
-
- // The public and private keys are retrieved
- char* public_text = NULL;
- char* secret_text = NULL;
-
- extract_keys_from_buffer(decryptedtext, decryptedtext_len, &public_text, &secret_text);
-
- byte public_key [32] = { 0 };
- byte secret_key [32] = { 0 };
-
- zmq_z85_decode (public_key, public_text);
- zmq_z85_decode (secret_key, secret_text);
-
- zcert_t* cert_loaded = zcert_new_from(public_key, secret_key);
-
- free(public_text);
- free(secret_text);
-
- return cert_loaded;
-}
-
-int generate_sha256_hash(char* text, unsigned char* digest)
-{
- unsigned int digest_len;
-
- EVP_MD_CTX * mdctx = EVP_MD_CTX_new();
- EVP_DigestInit_ex(mdctx, EVP_sha256(), NULL);
- EVP_DigestUpdate(mdctx, text, strlen(text));
- EVP_DigestFinal_ex(mdctx, digest, &digest_len);
- EVP_MD_CTX_free(mdctx);
-
- return digest_len;
-}
-
-int decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext)
-{
- int len;
- int plaintext_len;
-
- EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new();
-
- EVP_DecryptInit_ex(ctx, EVP_aes_256_cbc(), NULL, key, iv);
- EVP_DecryptUpdate(ctx, plaintext, &len, ciphertext, ciphertext_len);
- plaintext_len = len;
- EVP_DecryptFinal_ex(ctx, plaintext + len, &len);
- plaintext_len += len;
-
- EVP_CIPHER_CTX_free(ctx);
-
- return plaintext_len;
-}
-
-/**
- * Caller is responsible for freeing the returned value
- */
-static char* read_file_content(char* filePath, char* fileName){
-
- char fileNameWithPath[MAX_FILE_PATH_LENGTH];
- snprintf(fileNameWithPath, MAX_FILE_PATH_LENGTH, "%s/%s", filePath, fileName);
- int rc = 0;
-
- if (!zsys_file_exists(fileNameWithPath)){
- printf("CRYPTO: Keys file '%s' doesn't exist!\n", fileNameWithPath);
- return NULL;
- }
-
- zfile_t* keys_file = zfile_new (filePath, fileName);
- rc = zfile_input (keys_file);
- if (rc != 0){
- zfile_destroy(&keys_file);
- printf("CRYPTO: Keys file '%s' not readable!\n", fileNameWithPath);
- return NULL;
- }
-
- ssize_t keys_file_size = zsys_file_size (fileNameWithPath);
- zchunk_t* keys_chunk = zfile_read (keys_file, keys_file_size, 0);
- if (keys_chunk == NULL){
- zfile_close(keys_file);
- zfile_destroy(&keys_file);
- printf("CRYPTO: Can't read file '%s'!\n", fileNameWithPath);
- return NULL;
- }
-
- char* keys_data = zchunk_strdup(keys_chunk);
- zchunk_destroy(&keys_chunk);
- zfile_close(keys_file);
- zfile_destroy (&keys_file);
-
- return keys_data;
-}
-
-static void parse_key_lines(char *keysBuffer, char **key, char **iv){
- char *line = NULL, *saveLinePointer = NULL;
-
- bool firstTime = true;
- do {
- if (firstTime){
- line = strtok_r(keysBuffer, "\n", &saveLinePointer);
- firstTime = false;
- }else {
- line = strtok_r(NULL, "\n", &saveLinePointer);
- }
-
- if (line == NULL){
- break;
- }
-
- parse_key_line(line, key, iv);
-
- } while((*key == NULL || *iv == NULL) && line != NULL);
-
-}
-
-static void parse_key_line(char *line, char **key, char **iv){
- char *detectedKey = NULL, *detectedValue= NULL;
-
- char* sep_at = strchr(line, ':');
- if (sep_at == NULL){
- return;
- }
-
- *sep_at = '\0'; // overwrite first separator, creating two strings.
- detectedKey = line;
- detectedValue = sep_at + 1;
-
- if (detectedKey == NULL || detectedValue == NULL){
- return;
- }
- if (detectedKey[0] == '\0' || detectedValue[0] == '\0'){
- return;
- }
-
- if (*key == NULL && strcmp(detectedKey, KEY_TO_GET) == 0){
- *key = strndup(detectedValue, AES_KEY_LENGTH);
- } else if (*iv == NULL && strcmp(detectedKey, IV_TO_GET) == 0){
- *iv = strndup(detectedValue, AES_IV_LENGTH);
- }
-}
-
-static void extract_keys_from_buffer(unsigned char *input, int inputlen, char **publicKey, char **secretKey) {
- // Load decrypted text buffer
- zchunk_t* secret_decrypted = zchunk_new(input, inputlen);
- if (secret_decrypted == NULL){
- printf("CRYPTO: Failed to create zchunk\n");
- return;
- }
-
- zconfig_t* secret_config = zconfig_chunk_load (secret_decrypted);
- zchunk_destroy (&secret_decrypted);
- if (secret_config == NULL){
- printf("CRYPTO: Failed to create zconfig\n");
- return;
- }
-
- // Extract public and secret key from text buffer
- char* public_text = zconfig_get (secret_config, "/curve/public-key", NULL);
- char* secret_text = zconfig_get (secret_config, "/curve/secret-key", NULL);
-
- if (public_text == NULL || secret_text == NULL){
- zconfig_destroy(&secret_config);
- printf("CRYPTO: Loading public / secret key from text-buffer failed!\n");
- return;
- }
-
- *publicKey = strndup(public_text, ZMQ_KEY_LENGTH + 1);
- *secretKey = strndup(secret_text, ZMQ_KEY_LENGTH + 1);
-
- zconfig_destroy(&secret_config);
-}
http://git-wip-us.apache.org/repos/asf/celix/blob/cb740b0d/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.h b/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.h
deleted file mode 100644
index f1a990f..0000000
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.h
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * zmq_crypto.h
- *
- * \date Dec 2, 2016
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#ifndef ZMQ_CRYPTO_H_
-#define ZMQ_CRYPTO_H_
-
-#include <czmq.h>
-
-#define PROPERTY_KEYS_FILE_PATH "keys.file.path"
-#define PROPERTY_KEYS_FILE_NAME "keys.file.name"
-#define DEFAULT_KEYS_FILE_PATH "/etc/"
-#define DEFAULT_KEYS_FILE_NAME "pubsub.keys"
-
-zcert_t* get_zcert_from_encoded_file(char* keysFilePath, char* keysFileName, char* file_path);
-int generate_sha256_hash(char* text, unsigned char* digest);
-int decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext);
-
-#endif
http://git-wip-us.apache.org/repos/asf/celix/blob/cb740b0d/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
index 79ea1d4..ec3ee7d 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
@@ -19,95 +19,104 @@
#include <stdlib.h>
-
+#include <new>
+#include <iostream>
#include "celix_api.h"
#include "pubsub_serializer.h"
#include "log_helper.h"
#include "pubsub_admin.h"
#include "pubsub_nanomsg_admin.h"
-#include "../../../shell/shell/include/command.h"
-
-typedef struct psa_nanomsg_activator {
- log_helper_t *logHelper;
-
- pubsub_nanomsg_admin_t *admin;
-
- long serializersTrackerId;
-
- pubsub_admin_service_t adminService;
- long adminSvcId;
-
- command_service_t cmdSvc;
- long cmdSvcId;
-} psa_nanomsg_activator_t;
-
-int psa_nanomsg_start(psa_nanomsg_activator_t *act, celix_bundle_context_t *ctx) {
- act->adminSvcId = -1L;
- act->cmdSvcId = -1L;
- act->serializersTrackerId = -1L;
-
- logHelper_create(ctx, &act->logHelper);
- logHelper_start(act->logHelper);
-
- act->admin = pubsub_nanoMsgAdmin_create(ctx, act->logHelper);
- celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
-
- //track serializers
- if (status == CELIX_SUCCESS) {
- celix_service_tracking_options_t opts{};
- opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
- opts.filter.ignoreServiceLanguage = true;
- opts.callbackHandle = act->admin;
- opts.addWithProperties = pubsub_nanoMsgAdmin_addSerializerSvc;
- opts.removeWithProperties = pubsub_nanoMsgAdmin_removeSerializerSvc;
- act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- }
-
- //register pubsub admin service
- if (status == CELIX_SUCCESS) {
- pubsub_admin_service_t *psaSvc = &act->adminService;
- psaSvc->handle = act->admin;
- psaSvc->matchPublisher = pubsub_nanoMsgAdmin_matchPublisher;
- psaSvc->matchSubscriber = pubsub_nanoMsgAdmin_matchSubscriber;
- psaSvc->matchEndpoint = pubsub_nanoMsgAdmin_matchEndpoint;
- psaSvc->setupTopicSender = pubsub_nanoMsgAdmin_setupTopicSender;
- psaSvc->teardownTopicSender = pubsub_nanoMsgAdmin_teardownTopicSender;
- psaSvc->setupTopicReceiver = pubsub_nanoMsgAdmin_setupTopicReceiver;
- psaSvc->teardownTopicReceiver = pubsub_nanoMsgAdmin_teardownTopicReceiver;
- psaSvc->addEndpoint = pubsub_nanoMsgAdmin_addEndpoint;
- psaSvc->removeEndpoint = pubsub_nanoMsgAdmin_removeEndpoint;
-
- celix_properties_t *props = celix_properties_create();
- celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_NANOMSG_ADMIN_TYPE);
-
- act->adminSvcId = celix_bundleContext_registerService(ctx, psaSvc, PUBSUB_ADMIN_SERVICE_NAME, props);
- }
-
- //register shell command service
- {
- act->cmdSvc.handle = act->admin;
- act->cmdSvc.executeCommand = pubsub_nanoMsgAdmin_executeCommand;
- celix_properties_t *props = celix_properties_create();
- celix_properties_set(props, OSGI_SHELL_COMMAND_NAME, "psa_nanomsg");
- celix_properties_set(props, OSGI_SHELL_COMMAND_USAGE, "psa_nanomsg");
- celix_properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the ZMQ PSA");
- act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props);
- }
-
- return status;
-}
-int psa_nanomsg_stop(psa_nanomsg_activator_t *act, celix_bundle_context_t *ctx) {
- celix_bundleContext_unregisterService(ctx, act->adminSvcId);
- celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
- celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
- pubsub_nanoMsgAdmin_destroy(act->admin);
+class LogHelper {
+public:
+ LogHelper(celix_bundle_context_t *ctx) : context{ctx} {
+ if (logHelper_create(context, &logHelper)!= CELIX_SUCCESS) {
+ std::bad_alloc{};
+ }
+
+ }
+ ~LogHelper() {
+ logHelper_destroy(&logHelper);
+ }
+
+ LogHelper(const LogHelper &) = delete;
+ LogHelper & operator=(const LogHelper&) = delete;
+ celix_status_t start () {
+ return logHelper_start(logHelper);
+ }
+
+ celix_status_t stop () {
+ return logHelper_stop(logHelper);
+ }
+
+ log_helper_t *get() {
+ return logHelper;
+ }
+private:
+ celix_bundle_context_t *context;
+ log_helper_t *logHelper{};
+
+};
+
+class psa_nanomsg_activator {
+public:
+ psa_nanomsg_activator(celix_bundle_context_t *ctx) : context{ctx}, logHelper{context}, admin(context, logHelper.get()) {
+ }
+ psa_nanomsg_activator(const psa_nanomsg_activator&) = delete;
+ psa_nanomsg_activator& operator=(const psa_nanomsg_activator&) = delete;
+
+ ~psa_nanomsg_activator() {
+
+ }
+
+ celix_status_t start() {
+ admin.start();
+ auto status = logHelper.start();
+
+ return status;
+ }
+
+ celix_status_t stop() {
+ admin.stop();
+ return logHelper.stop();
+ };
+
+private:
+ celix_bundle_context_t *context{};
+ LogHelper logHelper;
+ pubsub_nanomsg_admin admin;
+
+
+// command_service_t cmdSvc{};
+
+// long cmdSvcId = -1L;
+};
+
+celix_status_t celix_bundleActivator_create(celix_bundle_context_t *ctx , void **userData) {
+ celix_status_t status = CELIX_SUCCESS;
+ auto data = new (std::nothrow) psa_nanomsg_activator{ctx};
+ if (data != NULL) {
+ *userData = data;
+ } else {
+ status = CELIX_ENOMEM;
+ }
+ return status;
+}
- logHelper_stop(act->logHelper);
- logHelper_destroy(&act->logHelper);
+celix_status_t celix_bundleActivator_start(void *userData, celix_bundle_context_t *) {
+ auto act = static_cast<psa_nanomsg_activator*>(userData);
+ return act->start();
+}
- return CELIX_SUCCESS;
+celix_status_t celix_bundleActivator_stop(void *userData, celix_bundle_context_t *) {
+ auto act = static_cast<psa_nanomsg_activator*>(userData);
+ return act->stop();
}
-CELIX_GEN_BUNDLE_ACTIVATOR(psa_nanomsg_activator_t, psa_nanomsg_start, psa_nanomsg_stop);
+
+celix_status_t celix_bundleActivator_destroy(void *userData, celix_bundle_context_t *) {
+ auto act = static_cast<psa_nanomsg_activator*>(userData);
+ delete act;
+ return CELIX_SUCCESS;
+}
http://git-wip-us.apache.org/repos/asf/celix/blob/cb740b0d/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index f3e6831..bd1d0a5 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -19,7 +19,9 @@
#include <string>
#include <vector>
+#include <functional>
#include <memory.h>
+#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
@@ -48,43 +50,6 @@
#define L_WARN printf
#define L_ERROR printf
-struct pubsub_nanomsg_admin {
- celix_bundle_context_t *ctx;
- log_helper_t *log;
- const char *fwUUID;
-
- char* ipAddress;
-
- unsigned int basePort;
- unsigned int maxPort;
-
- double qosSampleScore;
- double qosControlScore;
- double defaultScore;
-
- bool verbose;
-
- struct {
- celix_thread_mutex_t mutex;
- hash_map_t *map; //key = svcId, value = psa_nanomsg_serializer_entry_t*
- } serializers;
-
- struct {
- celix_thread_mutex_t mutex;
- hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t*
- } topicSenders;
-
- struct {
- celix_thread_mutex_t mutex;
- hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t*
- } topicReceivers;
-
- struct {
- celix_thread_mutex_t mutex;
- hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
- } discoveredEndpoints;
-
-};
typedef struct psa_nanomsg_serializer_entry {
const char *serType;
@@ -93,164 +58,221 @@ typedef struct psa_nanomsg_serializer_entry {
} psa_nanomsg_serializer_entry_t;
static celix_status_t nanoMsg_getIpAddress(const char *interface, char **ip);
-static celix_status_t pubsub_nanoMsgAdmin_connectEndpointToReceiver(pubsub_nanomsg_admin_t *psa,
- pubsub_nanomsg_topic_receiver_t *receiver,
- const celix_properties_t *endpoint);
-static celix_status_t pubsub_nanoMsgAdmin_disconnectEndpointFromReceiver(pubsub_nanomsg_admin_t *psa,
- pubsub_nanomsg_topic_receiver_t *receiver,
- const celix_properties_t *endpoint);
-
-
-pubsub_nanomsg_admin_t* pubsub_nanoMsgAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper) {
- pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(calloc(1, sizeof(*psa)));
- psa->ctx = ctx;
- psa->log = logHelper;
- psa->verbose = celix_bundleContext_getPropertyAsBool(ctx, PUBSUB_NANOMSG_VERBOSE_KEY, PUBSUB_NANOMSG_VERBOSE_DEFAULT);
- psa->fwUUID = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
-
- char *ip = NULL;
- const char *confIp = celix_bundleContext_getProperty(ctx, PUBSUB_NANOMSG_PSA_IP_KEY , NULL);
- if (confIp != NULL) {
+
+pubsub_nanomsg_admin::pubsub_nanomsg_admin(celix_bundle_context_t *_ctx, log_helper_t *logHelper):
+ ctx{_ctx},
+ log{logHelper} {
+ verbose = celix_bundleContext_getPropertyAsBool(ctx, PUBSUB_NANOMSG_VERBOSE_KEY, PUBSUB_NANOMSG_VERBOSE_DEFAULT);
+ fwUUID = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, nullptr);
+
+ char *ip = nullptr;
+ const char *confIp = celix_bundleContext_getProperty(ctx, PUBSUB_NANOMSG_PSA_IP_KEY , nullptr);
+ if (confIp != nullptr) {
ip = strndup(confIp, 1024);
}
- if (ip == NULL) {
+ if (ip == nullptr) {
//TODO try to get ip from subnet (CIDR)
}
- if (ip == NULL) {
+ if (ip == nullptr) {
//try to get ip from itf
- const char *interface = celix_bundleContext_getProperty(ctx, PUBSUB_NANOMSG_PSA_ITF_KEY, NULL);
+ const char *interface = celix_bundleContext_getProperty(ctx, PUBSUB_NANOMSG_PSA_ITF_KEY, nullptr);
nanoMsg_getIpAddress(interface, &ip);
}
- if (ip == NULL) {
+ if (ip == nullptr) {
L_WARN("[PSA_NANOMSG] Could not determine IP address for PSA, using default ip (%s)", PUBSUB_NANOMSG_DEFAULT_IP);
ip = strndup(PUBSUB_NANOMSG_DEFAULT_IP, 1024);
}
- psa->ipAddress = ip;
- if (psa->verbose) {
+ ipAddress = ip;
+ if (verbose) {
L_INFO("[PSA_NANOMSG] Using %s for service annunciation", ip);
}
- long basePort = celix_bundleContext_getPropertyAsLong(ctx, PSA_NANOMSG_BASE_PORT, PSA_NANOMSG_DEFAULT_BASE_PORT);
- long maxPort = celix_bundleContext_getPropertyAsLong(ctx, PSA_NANOMSG_MAX_PORT, PSA_NANOMSG_DEFAULT_MAX_PORT);
- psa->basePort = (unsigned int)basePort;
- psa->maxPort = (unsigned int)maxPort;
- if (psa->verbose) {
- L_INFO("[PSA_NANOMSG] Using base till max port: %i till %i", psa->basePort, psa->maxPort);
+ long _basePort = celix_bundleContext_getPropertyAsLong(ctx, PSA_NANOMSG_BASE_PORT, PSA_NANOMSG_DEFAULT_BASE_PORT);
+ long _maxPort = celix_bundleContext_getPropertyAsLong(ctx, PSA_NANOMSG_MAX_PORT, PSA_NANOMSG_DEFAULT_MAX_PORT);
+ basePort = (unsigned int)_basePort;
+ maxPort = (unsigned int)_maxPort;
+ if (verbose) {
+ L_INFO("[PSA_NANOMSG] Using base till max port: %li till %li", _basePort, _maxPort);
}
-// long nrThreads = celix_bundleContext_getPropertyAsLong(ctx, PUBSUB_NANOMSG_NR_THREADS_KEY, 0);
-// if (nrThreads > 0) {
-// zsys_set_io_threads((size_t)nrThreads);
-// L_INFO("[PSA_NANOMSG] Using %d threads for NanoMsg", (size_t)nrThreads);
-// }
-
-
- psa->defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_DEFAULT_SCORE_KEY, PSA_NANOMSG_DEFAULT_SCORE);
- psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE);
- psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_CONTROL_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE);
+ defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_DEFAULT_SCORE_KEY, PSA_NANOMSG_DEFAULT_SCORE);
+ qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE);
+ qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_CONTROL_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE);
- celixThreadMutex_create(&psa->serializers.mutex, NULL);
- psa->serializers.map = hashMap_create(NULL, NULL, NULL, NULL);
+ celixThreadMutex_create(&serializers.mutex, nullptr);
+ serializers.map = hashMap_create(nullptr, nullptr, nullptr, nullptr);
- celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
- psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ celixThreadMutex_create(&topicSenders.mutex, nullptr);
+ topicSenders.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr);
- celixThreadMutex_create(&psa->topicReceivers.mutex, NULL);
- psa->topicReceivers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ celixThreadMutex_create(&topicReceivers.mutex, nullptr);
+ topicReceivers.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr);
- celixThreadMutex_create(&psa->discoveredEndpoints.mutex, NULL);
- psa->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-
- return psa;
+ celixThreadMutex_create(&discoveredEndpoints.mutex, nullptr);
+ discoveredEndpoints.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr);
}
-void pubsub_nanoMsgAdmin_destroy(pubsub_nanomsg_admin_t *psa) {
- if (psa == NULL) {
- return;
- }
-
+pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
//note assuming al psa register services and service tracker are removed.
- celixThreadMutex_lock(&psa->topicSenders.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+ celixThreadMutex_lock(&topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
while (hashMapIterator_hasNext(&iter)) {
- pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapIterator_nextValue(&iter));
+ auto *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapIterator_nextValue(&iter));
pubsub_nanoMsgTopicSender_destroy(sender);
}
- celixThreadMutex_unlock(&psa->topicSenders.mutex);
+ celixThreadMutex_unlock(&topicSenders.mutex);
- celixThreadMutex_lock(&psa->topicReceivers.mutex);
- iter = hashMapIterator_construct(psa->topicReceivers.map);
+ celixThreadMutex_lock(&topicReceivers.mutex);
+ iter = hashMapIterator_construct(topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
- pubsub_nanomsg_topic_receiver_t *recv = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
+ auto *recv = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
pubsub_nanoMsgTopicReceiver_destroy(recv);
}
- celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+ celixThreadMutex_unlock(&topicReceivers.mutex);
- celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
- iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
+ celixThreadMutex_lock(&discoveredEndpoints.mutex);
+ iter = hashMapIterator_construct(discoveredEndpoints.map);
while (hashMapIterator_hasNext(&iter)) {
- celix_properties_t *ep = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
+ auto *ep = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
celix_properties_destroy(ep);
}
- celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+ celixThreadMutex_unlock(&discoveredEndpoints.mutex);
- celixThreadMutex_lock(&psa->serializers.mutex);
- iter = hashMapIterator_construct(psa->serializers.map);
+ celixThreadMutex_lock(&serializers.mutex);
+ iter = hashMapIterator_construct(serializers.map);
while (hashMapIterator_hasNext(&iter)) {
- psa_nanomsg_serializer_entry_t *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMapIterator_nextValue(&iter));
+ auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMapIterator_nextValue(&iter));
free(entry);
}
- celixThreadMutex_unlock(&psa->serializers.mutex);
+ celixThreadMutex_unlock(&serializers.mutex);
- celixThreadMutex_destroy(&psa->topicSenders.mutex);
- hashMap_destroy(psa->topicSenders.map, true, false);
+ celixThreadMutex_destroy(&topicSenders.mutex);
+ hashMap_destroy(topicSenders.map, true, false);
- celixThreadMutex_destroy(&psa->topicReceivers.mutex);
- hashMap_destroy(psa->topicReceivers.map, true, false);
+ celixThreadMutex_destroy(&topicReceivers.mutex);
+ hashMap_destroy(topicReceivers.map, true, false);
- celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
- hashMap_destroy(psa->discoveredEndpoints.map, false, false);
+ celixThreadMutex_destroy(&discoveredEndpoints.mutex);
+ hashMap_destroy(discoveredEndpoints.map, false, false);
- celixThreadMutex_destroy(&psa->serializers.mutex);
- hashMap_destroy(psa->serializers.map, false, false);
+ celixThreadMutex_destroy(&serializers.mutex);
+ hashMap_destroy(serializers.map, false, false);
- free(psa->ipAddress);
+ free(ipAddress);
- free(psa);
}
-void pubsub_nanoMsgAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
- pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+void pubsub_nanomsg_admin::start() {
+ adminService.handle = this;
+ adminService.matchPublisher = [](void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId) {
+ auto me = static_cast<pubsub_nanomsg_admin*>(handle);
+ return me->matchPublisher(svcRequesterBndId, svcFilter,score, serializerSvcId);
+ };
+ adminService.matchSubscriber = [](void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId) {
+ auto me = static_cast<pubsub_nanomsg_admin*>(handle);
+ return me->matchSubscriber(svcProviderBndId, svcProperties, score, serializerSvcId);
+ };
+ adminService.matchEndpoint = [](void *handle, const celix_properties_t *endpoint, bool *match) {
+ auto me = static_cast<pubsub_nanomsg_admin*>(handle);
+ return me->matchEndpoint(endpoint, match);
+ };
+ adminService.setupTopicSender = [](void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint) {
+ auto me = static_cast<pubsub_nanomsg_admin*>(handle);
+ return me->setupTopicSender(scope, topic, serializerSvcId, publisherEndpoint);
+ };
+ adminService.teardownTopicSender = [](void *handle, const char *scope, const char *topic) {
+ auto me = static_cast<pubsub_nanomsg_admin*>(handle);
+ return me->teardownTopicSender(scope, topic);
+ };
+ adminService.setupTopicReceiver = [](void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint) {
+ auto me = static_cast<pubsub_nanomsg_admin*>(handle);
+ return me->setupTopicReceiver(scope, topic,serializerSvcId, subscriberEndpoint);
+ };
+
+ adminService.teardownTopicReceiver = [] (void *handle, const char *scope, const char *topic) {
+ auto me = static_cast<pubsub_nanomsg_admin*>(handle);
+ return me->teardownTopicReceiver(scope, topic);
+ };
+ adminService.addEndpoint = [](void *handle, const celix_properties_t *endpoint) {
+ auto me = static_cast<pubsub_nanomsg_admin*>(handle);
+ return me->addEndpoint(endpoint);
+ };
+ adminService.removeEndpoint = [](void *handle, const celix_properties_t *endpoint) {
+ auto me = static_cast<pubsub_nanomsg_admin*>(handle);
+ return me->removeEndpoint(endpoint);
+ };
+
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_NANOMSG_ADMIN_TYPE);
+
+ adminSvcId = celix_bundleContext_registerService(ctx, static_cast<void*>(&adminService), PUBSUB_ADMIN_SERVICE_NAME, props);
+
+
+ celix_service_tracking_options_t opts{};
+ opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
+ opts.filter.ignoreServiceLanguage = true;
+ opts.callbackHandle = this;
+ opts.addWithProperties = [](void *handle, void *svc, const celix_properties_t *props) {
+ auto me = static_cast<pubsub_nanomsg_admin*>(handle);
+ me->addSerializerSvc(svc, props);
+ };
+ opts.removeWithProperties = [](void *handle, void *svc, const celix_properties_t *props) {
+ auto me = static_cast<pubsub_nanomsg_admin*>(handle);
+ me->removeSerializerSvc(svc, props);
+ };
+ serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+
+ //register shell command service
+ cmdSvc.handle = this;
+ cmdSvc.executeCommand = [](void *handle, char * commandLine, FILE *outStream, FILE *errorStream) {
+ auto me = static_cast<pubsub_nanomsg_admin*>(handle);
+ return me->executeCommand(commandLine, outStream, errorStream);
+ };
+
+ celix_properties_t* shellProps = celix_properties_create();
+ celix_properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "psa_nanomsg");
+ celix_properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "psa_nanomsg");
+ celix_properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the ZMQ PSA");
+ cmdSvcId = celix_bundleContext_registerService(ctx, &cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, shellProps);
+
+}
- const char *serType = celix_properties_get(props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
+void pubsub_nanomsg_admin::stop() {
+ celix_bundleContext_unregisterService(ctx, adminSvcId);
+ celix_bundleContext_unregisterService(ctx, cmdSvcId);
+ celix_bundleContext_stopTracker(ctx, serializersTrackerId);
+}
+
+void pubsub_nanomsg_admin::addSerializerSvc(void *svc, const celix_properties_t *props) {
+ const char *serType = celix_properties_get(props, PUBSUB_SERIALIZER_TYPE_KEY, nullptr);
long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
- if (serType == NULL) {
+ if (serType == nullptr) {
L_INFO("[PSA_NANOMSG] Ignoring serializer service without %s property", PUBSUB_SERIALIZER_TYPE_KEY);
return;
}
- celixThreadMutex_lock(&psa->serializers.mutex);
- psa_nanomsg_serializer_entry_t *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(psa->serializers.map, (void*)svcId));
- if (entry == NULL) {
+ celixThreadMutex_lock(&serializers.mutex);
+ auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)svcId));
+ if (entry == nullptr) {
entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(*entry)));
entry->serType = serType;
entry->svcId = svcId;
entry->svc = static_cast<pubsub_serializer_service_t*>(svc);
- hashMap_put(psa->serializers.map, (void*)svcId, entry);
+ hashMap_put(serializers.map, (void*)svcId, entry);
}
- celixThreadMutex_unlock(&psa->serializers.mutex);
+ celixThreadMutex_unlock(&serializers.mutex);
}
-void pubsub_nanoMsgAdmin_removeSerializerSvc(void *handle, void */*svc*/, const celix_properties_t *props) {
- pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+
+void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_properties_t *props) {
long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
//remove serializer
@@ -259,82 +281,78 @@ void pubsub_nanoMsgAdmin_removeSerializerSvc(void *handle, void */*svc*/, const
// 3) loop and destroy all topic receivers using the serializer
// Note that it is the responsibility of the topology manager to create new topic senders/receivers
- celixThreadMutex_lock(&psa->serializers.mutex);
- psa_nanomsg_serializer_entry_t *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_remove(psa->serializers.map, (void*)svcId));
- if (entry != NULL) {
- celixThreadMutex_lock(&psa->topicSenders.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+ celixThreadMutex_lock(&serializers.mutex);
+ auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_remove(serializers.map, (void*)svcId));
+ if (entry != nullptr) {
+ celixThreadMutex_lock(&topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
while (hashMapIterator_hasNext(&iter)) {
hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
- pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapEntry_getValue(senderEntry));
- if (sender != NULL && entry->svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) {
+ auto *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapEntry_getValue(senderEntry));
+ if (sender != nullptr && entry->svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) {
char *key = static_cast<char*>(hashMapEntry_getKey(senderEntry));
hashMapIterator_remove(&iter);
pubsub_nanoMsgTopicSender_destroy(sender);
free(key);
}
}
- celixThreadMutex_unlock(&psa->topicSenders.mutex);
+ celixThreadMutex_unlock(&topicSenders.mutex);
- celixThreadMutex_lock(&psa->topicReceivers.mutex);
- iter = hashMapIterator_construct(psa->topicReceivers.map);
+ celixThreadMutex_lock(&topicReceivers.mutex);
+ iter = hashMapIterator_construct(topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
- pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(senderEntry));
- if (receiver != NULL && entry->svcId == pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver)) {
+ auto *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(senderEntry));
+ if (receiver != nullptr && entry->svcId == pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver)) {
char *key = static_cast<char*>(hashMapEntry_getKey(senderEntry));
hashMapIterator_remove(&iter);
pubsub_nanoMsgTopicReceiver_destroy(receiver);
free(key);
}
}
- celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+ celixThreadMutex_unlock(&topicReceivers.mutex);
free(entry);
}
- celixThreadMutex_unlock(&psa->serializers.mutex);
+ celixThreadMutex_unlock(&serializers.mutex);
}
-celix_status_t pubsub_nanoMsgAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter,
+celix_status_t pubsub_nanomsg_admin::matchPublisher(long svcRequesterBndId, const celix_filter_t *svcFilter,
double *outScore, long *outSerializerSvcId) {
- pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
L_DEBUG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchPublisher");
celix_status_t status = CELIX_SUCCESS;
- double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_NANOMSG_ADMIN_TYPE,
- psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+ double score = pubsub_utils_matchPublisher(ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_NANOMSG_ADMIN_TYPE,
+ qosSampleScore, qosControlScore, defaultScore, outSerializerSvcId);
*outScore = score;
return status;
}
-celix_status_t pubsub_nanoMsgAdmin_matchSubscriber(void *handle, long svcProviderBndId,
+celix_status_t pubsub_nanomsg_admin::matchSubscriber(long svcProviderBndId,
const celix_properties_t *svcProperties, double *outScore,
long *outSerializerSvcId) {
- pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
L_DEBUG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchSubscriber");
celix_status_t status = CELIX_SUCCESS;
- double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_NANOMSG_ADMIN_TYPE,
- psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
- if (outScore != NULL) {
+ double score = pubsub_utils_matchSubscriber(ctx, svcProviderBndId, svcProperties, PUBSUB_NANOMSG_ADMIN_TYPE,
+ qosSampleScore, qosControlScore, defaultScore, outSerializerSvcId);
+ if (outScore != nullptr) {
*outScore = score;
}
return status;
}
-celix_status_t pubsub_nanoMsgAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *outMatch) {
- pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+celix_status_t pubsub_nanomsg_admin::matchEndpoint(const celix_properties_t *endpoint, bool *outMatch) {
L_DEBUG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchEndpoint");
celix_status_t status = CELIX_SUCCESS;
- bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_NANOMSG_ADMIN_TYPE, NULL);
- if (outMatch != NULL) {
+ bool match = pubsub_utils_matchEndpoint(ctx, endpoint, PUBSUB_NANOMSG_ADMIN_TYPE, nullptr);
+ if (outMatch != nullptr) {
*outMatch = match;
}
return status;
}
-celix_status_t pubsub_nanoMsgAdmin_setupTopicSender(void *handle, const char *scope, const char *topic,
+celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const char *topic,
long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
- pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
celix_status_t status = CELIX_SUCCESS;
//1) Create TopicSender
@@ -342,31 +360,31 @@ celix_status_t pubsub_nanoMsgAdmin_setupTopicSender(void *handle, const char *sc
//3) Connect existing endpoints
//4) set outPublisherEndpoint
- celix_properties_t *newEndpoint = NULL;
+ celix_properties_t *newEndpoint = nullptr;
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
- celixThreadMutex_lock(&psa->serializers.mutex);
- celixThreadMutex_lock(&psa->topicSenders.mutex);
- pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMap_get(psa->topicSenders.map, key));
- if (sender == NULL) {
- psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(psa->serializers.map, (void*)serializerSvcId));
- if (serEntry != NULL) {
- sender = pubsub_nanoMsgTopicSender_create(psa->ctx, psa->log, scope, topic, serializerSvcId, serEntry->svc,
- psa->ipAddress, psa->basePort, psa->maxPort);
+ celixThreadMutex_lock(&serializers.mutex);
+ celixThreadMutex_lock(&topicSenders.mutex);
+ auto *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMap_get(topicSenders.map, key));
+ if (sender == nullptr) {
+ auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)serializerSvcId));
+ if (serEntry != nullptr) {
+ sender = pubsub_nanoMsgTopicSender_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc,
+ ipAddress, basePort, maxPort);
}
- if (sender != NULL) {
+ if (sender != nullptr) {
const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
const char *serType = serEntry->serType;
- newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
- serType, NULL);
+ newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
+ serType, nullptr);
celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, pubsub_nanoMsgTopicSender_url(sender));
//if available also set container name
- const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
- if (cn != NULL) {
+ const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr);
+ if (cn != nullptr) {
celix_properties_set(newEndpoint, "container_name", cn);
}
- hashMap_put(psa->topicSenders.map, key, sender);
+ hashMap_put(topicSenders.map, key, sender);
} else {
L_ERROR("[PSA NANOMSG] Error creating a TopicSender");
free(key);
@@ -375,74 +393,72 @@ celix_status_t pubsub_nanoMsgAdmin_setupTopicSender(void *handle, const char *sc
free(key);
L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicSender for scope/topic %s/%s!", scope, topic);
}
- celixThreadMutex_unlock(&psa->topicSenders.mutex);
- celixThreadMutex_unlock(&psa->serializers.mutex);
+ celixThreadMutex_unlock(&topicSenders.mutex);
+ celixThreadMutex_unlock(&serializers.mutex);
- if (sender != NULL && newEndpoint != NULL) {
+ if (sender != nullptr && newEndpoint != nullptr) {
//TODO connect endpoints to sender, NOTE is this needed for a nanomsg topic sender?
}
- if (newEndpoint != NULL && outPublisherEndpoint != NULL) {
+ if (newEndpoint != nullptr && outPublisherEndpoint != nullptr) {
*outPublisherEndpoint = newEndpoint;
}
return status;
}
-celix_status_t pubsub_nanoMsgAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic) {
- pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, const char *topic) {
celix_status_t status = CELIX_SUCCESS;
//1) Find and remove TopicSender from map
//2) destroy topic sender
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
- celixThreadMutex_lock(&psa->topicSenders.mutex);
- hash_map_entry_t *entry = hashMap_getEntry(psa->topicSenders.map, key);
- if (entry != NULL) {
+ celixThreadMutex_lock(&topicSenders.mutex);
+ hash_map_entry_t *entry = hashMap_getEntry(topicSenders.map, key);
+ if (entry != nullptr) {
char *mapKey = static_cast<char*>(hashMapEntry_getKey(entry));
- pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMap_remove(psa->topicSenders.map, key));
+ pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMap_remove(topicSenders.map, key));
free(mapKey);
//TODO disconnect endpoints to sender. note is this needed for a nanomsg topic sender?
pubsub_nanoMsgTopicSender_destroy(sender);
} else {
L_ERROR("[PSA NANOMSG] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic);
}
- celixThreadMutex_unlock(&psa->topicSenders.mutex);
+ celixThreadMutex_unlock(&topicSenders.mutex);
free(key);
return status;
}
-celix_status_t pubsub_nanoMsgAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic,
+celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const char *topic,
long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
- pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
- celix_properties_t *newEndpoint = NULL;
+ celix_properties_t *newEndpoint = nullptr;
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
- celixThreadMutex_lock(&psa->serializers.mutex);
- celixThreadMutex_lock(&psa->topicReceivers.mutex);
- pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMap_get(psa->topicReceivers.map, key));
- if (receiver == NULL) {
- psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(psa->serializers.map, (void*)serializerSvcId));
- if (serEntry != NULL) {
- receiver = pubsub_nanoMsgTopicReceiver_create(psa->ctx, psa->log, scope, topic, serializerSvcId,
+ celixThreadMutex_lock(&serializers.mutex);
+ celixThreadMutex_lock(&topicReceivers.mutex);
+ auto *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMap_get(topicReceivers.map, key));
+ if (receiver == nullptr) {
+ auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)serializerSvcId));
+ if (serEntry != nullptr) {
+ receiver = pubsub_nanoMsgTopicReceiver_create(ctx, log, scope, topic, serializerSvcId,
serEntry->svc);
} else {
L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope, topic);
}
- if (receiver != NULL) {
+ if (receiver != nullptr) {
const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
const char *serType = serEntry->serType;
- newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
- PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL);
+ newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic,
+ PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, nullptr);
//if available also set container name
- const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
- if (cn != NULL) {
+ const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr);
+ if (cn != nullptr) {
celix_properties_set(newEndpoint, "container_name", cn);
}
- hashMap_put(psa->topicReceivers.map, key, receiver);
+ hashMap_put(topicReceivers.map, key, receiver);
} else {
L_ERROR("[PSA NANOMSG] Error creating a TopicReceiver.");
free(key);
@@ -451,23 +467,23 @@ celix_status_t pubsub_nanoMsgAdmin_setupTopicReceiver(void *handle, const char *
free(key);
L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic);
}
- celixThreadMutex_unlock(&psa->topicReceivers.mutex);
- celixThreadMutex_unlock(&psa->serializers.mutex);
+ celixThreadMutex_unlock(&topicReceivers.mutex);
+ celixThreadMutex_unlock(&serializers.mutex);
- if (receiver != NULL && newEndpoint != NULL) {
- celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
+ if (receiver != nullptr && newEndpoint != nullptr) {
+ celixThreadMutex_lock(&discoveredEndpoints.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(discoveredEndpoints.map);
while (hashMapIterator_hasNext(&iter)) {
- celix_properties_t *endpoint = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
- const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
- if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
- pubsub_nanoMsgAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+ auto *endpoint = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
+ const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr);
+ if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+ connectEndpointToReceiver(receiver, endpoint);
}
}
- celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+ celixThreadMutex_unlock(&discoveredEndpoints.mutex);
}
- if (newEndpoint != NULL && outSubscriberEndpoint != NULL) {
+ if (newEndpoint != nullptr && outSubscriberEndpoint != nullptr) {
*outSubscriberEndpoint = newEndpoint;
}
@@ -475,29 +491,26 @@ celix_status_t pubsub_nanoMsgAdmin_setupTopicReceiver(void *handle, const char *
return status;
}
-celix_status_t pubsub_nanoMsgAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic) {
- pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
-
+celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, const char *topic) {
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
- celixThreadMutex_lock(&psa->topicReceivers.mutex);
- hash_map_entry_t *entry = hashMap_getEntry(psa->topicReceivers.map, key);
+ celixThreadMutex_lock(&topicReceivers.mutex);
+ hash_map_entry_t *entry = hashMap_getEntry(topicReceivers.map, key);
free(key);
- if (entry != NULL) {
+ if (entry != nullptr) {
char *receiverKey = static_cast<char*>(hashMapEntry_getKey(entry));
pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(entry));
- hashMap_remove(psa->topicReceivers.map, receiverKey);
+ hashMap_remove(topicReceivers.map, receiverKey);
free(receiverKey);
pubsub_nanoMsgTopicReceiver_destroy(receiver);
}
- celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ celixThreadMutex_lock(&topicReceivers.mutex);
celix_status_t status = CELIX_SUCCESS;
return status;
}
-static celix_status_t pubsub_nanoMsgAdmin_connectEndpointToReceiver(pubsub_nanomsg_admin_t * /*psa*/,
- pubsub_nanomsg_topic_receiver_t *receiver,
+celix_status_t pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub_nanomsg_topic_receiver_t *receiver,
const celix_properties_t *endpoint) {
//note can be called with discoveredEndpoint.mutex lock
celix_status_t status = CELIX_SUCCESS;
@@ -505,17 +518,17 @@ static celix_status_t pubsub_nanoMsgAdmin_connectEndpointToReceiver(pubsub_nanom
const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
- const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
- const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
- const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, NULL);
+ const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, nullptr);
+ const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, nullptr);
+ const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, nullptr);
- if (url == NULL) {
-// const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
-// const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+ if (url == nullptr) {
+// const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, nullptr);
+// const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr);
// L_WARN("[PSA NANOMSG] Error got endpoint without a nanomsg url (admin: %s, type: %s)", admin , type);
status = CELIX_BUNDLE_EXCEPTION;
} else {
- if (eScope != NULL && eTopic != NULL &&
+ if (eScope != nullptr && eTopic != nullptr &&
strncmp(eScope, scope, 1024 * 1024) == 0 &&
strncmp(eTopic, topic, 1024 * 1024) == 0) {
pubsub_nanoMsgTopicReceiver_connectTo(receiver, url);
@@ -525,50 +538,47 @@ static celix_status_t pubsub_nanoMsgAdmin_connectEndpointToReceiver(pubsub_nanom
return status;
}
-celix_status_t pubsub_nanoMsgAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint) {
- pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
-
- const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpoint) {
+ const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr);
- if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
- celixThreadMutex_lock(&psa->topicReceivers.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+ if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+ celixThreadMutex_lock(&topicReceivers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
- pubsub_nanoMsgAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+ connectEndpointToReceiver(receiver, endpoint);
}
- celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+ celixThreadMutex_unlock(&topicReceivers.mutex);
}
- celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+ celixThreadMutex_lock(&discoveredEndpoints.mutex);
celix_properties_t *cpy = celix_properties_copy(endpoint);
- const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, NULL);
- hashMap_put(psa->discoveredEndpoints.map, (void*)uuid, cpy);
- celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+ const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, nullptr);
+ hashMap_put(discoveredEndpoints.map, (void*)uuid, cpy);
+ celixThreadMutex_unlock(&discoveredEndpoints.mutex);
celix_status_t status = CELIX_SUCCESS;
return status;
}
-static celix_status_t pubsub_nanoMsgAdmin_disconnectEndpointFromReceiver(pubsub_nanomsg_admin_t * /*psa*/,
- pubsub_nanomsg_topic_receiver_t *receiver,
- const celix_properties_t *endpoint) {
+celix_status_t pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub_nanomsg_topic_receiver_t *receiver,
+ const celix_properties_t *endpoint) {
//note can be called with discoveredEndpoint.mutex lock
celix_status_t status = CELIX_SUCCESS;
const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
- const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
- const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
- const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, NULL);
+ const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, nullptr);
+ const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, nullptr);
+ const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, nullptr);
- if (url == NULL) {
+ if (url == nullptr) {
L_WARN("[PSA NANOMSG] Error got endpoint without nanomsg url");
status = CELIX_BUNDLE_EXCEPTION;
} else {
- if (eScope != NULL && eTopic != NULL &&
+ if (eScope != nullptr && eTopic != nullptr &&
strncmp(eScope, scope, 1024 * 1024) == 0 &&
strncmp(eTopic, topic, 1024 * 1024) == 0) {
pubsub_nanoMsgTopicReceiver_disconnectFrom(receiver, url);
@@ -578,27 +588,25 @@ static celix_status_t pubsub_nanoMsgAdmin_disconnectEndpointFromReceiver(pubsub_
return status;
}
-celix_status_t pubsub_nanoMsgAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint) {
- pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
-
- const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+celix_status_t pubsub_nanomsg_admin::removeEndpoint(const celix_properties_t *endpoint) {
+ const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr);
- if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
- celixThreadMutex_lock(&psa->topicReceivers.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+ if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+ celixThreadMutex_lock(&topicReceivers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
- pubsub_nanoMsgAdmin_disconnectEndpointFromReceiver(psa, receiver, endpoint);
+ disconnectEndpointFromReceiver(receiver, endpoint);
}
- celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+ celixThreadMutex_unlock(&topicReceivers.mutex);
}
- celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
- const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
- celix_properties_t *found = static_cast<celix_properties_t*>(hashMap_remove(psa->discoveredEndpoints.map, (void*)uuid));
- celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+ celixThreadMutex_lock(&discoveredEndpoints.mutex);
+ const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, nullptr);
+ celix_properties_t *found = static_cast<celix_properties_t*>(hashMap_remove(discoveredEndpoints.map, (void*)uuid));
+ celixThreadMutex_unlock(&discoveredEndpoints.mutex);
- if (found != NULL) {
+ if (found != nullptr) {
celix_properties_destroy(found);
}
@@ -606,21 +614,20 @@ celix_status_t pubsub_nanoMsgAdmin_removeEndpoint(void *handle, const celix_prop
return status;
}
-celix_status_t pubsub_nanoMsgAdmin_executeCommand(void *handle, char *commandLine __attribute__((unused)), FILE *out,
+celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribute__((unused)), FILE *out,
FILE *errStream __attribute__((unused))) {
- pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
celix_status_t status = CELIX_SUCCESS;
fprintf(out, "\n");
fprintf(out, "Topic Senders:\n");
- celixThreadMutex_lock(&psa->serializers.mutex);
- celixThreadMutex_lock(&psa->topicSenders.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+ celixThreadMutex_lock(&serializers.mutex);
+ celixThreadMutex_lock(&topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
while (hashMapIterator_hasNext(&iter)) {
pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapIterator_nextValue(&iter));
long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender);
- psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(psa->serializers.map, (void*)serSvcId));
- const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
+ psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)serSvcId));
+ const char *serType = serEntry == nullptr ? "!Error!" : serEntry->serType;
const char *scope = pubsub_nanoMsgTopicSender_scope(sender);
const char *topic = pubsub_nanoMsgTopicSender_topic(sender);
const char *url = pubsub_nanoMsgTopicSender_url(sender);
@@ -628,19 +635,19 @@ celix_status_t pubsub_nanoMsgAdmin_executeCommand(void *handle, char *commandLin
fprintf(out, " |- serializer type = %s\n", serType);
fprintf(out, " |- url = %s\n", url);
}
- celixThreadMutex_unlock(&psa->topicSenders.mutex);
- celixThreadMutex_unlock(&psa->serializers.mutex);
+ celixThreadMutex_unlock(&topicSenders.mutex);
+ celixThreadMutex_unlock(&serializers.mutex);
fprintf(out, "\n");
fprintf(out, "\nTopic Receivers:\n");
- celixThreadMutex_lock(&psa->serializers.mutex);
- celixThreadMutex_lock(&psa->topicReceivers.mutex);
- iter = hashMapIterator_construct(psa->topicReceivers.map);
+ celixThreadMutex_lock(&serializers.mutex);
+ celixThreadMutex_lock(&topicReceivers.mutex);
+ iter = hashMapIterator_construct(topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
long serSvcId = pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver);
- psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(psa->serializers.map, (void*)serSvcId));
- const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
+ psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)serSvcId));
+ const char *serType = serEntry == nullptr ? "!Error!" : serEntry->serType;
const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
@@ -657,8 +664,8 @@ celix_status_t pubsub_nanoMsgAdmin_executeCommand(void *handle, char *commandLin
fprintf(out, " |- unconnected url = %s\n", url.c_str());
}
}
- celixThreadMutex_unlock(&psa->topicReceivers.mutex);
- celixThreadMutex_unlock(&psa->serializers.mutex);
+ celixThreadMutex_unlock(&topicReceivers.mutex);
+ celixThreadMutex_unlock(&serializers.mutex);
fprintf(out, "\n");
return status;
@@ -673,13 +680,13 @@ static celix_status_t nanoMsg_getIpAddress(const char *interface, char **ip) {
if (getifaddrs(&ifaddr) != -1)
{
- for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa = ifa->ifa_next)
+ for (ifa = ifaddr; ifa != nullptr && status != CELIX_SUCCESS; ifa = ifa->ifa_next)
{
- if (ifa->ifa_addr == NULL)
+ if (ifa->ifa_addr == nullptr)
continue;
- if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) {
- if (interface == NULL) {
+ if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, nullptr, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) {
+ if (interface == nullptr) {
*ip = strdup(host);
status = CELIX_SUCCESS;
}
http://git-wip-us.apache.org/repos/asf/celix/blob/cb740b0d/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index 195ccb7..385b400 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -20,8 +20,11 @@
#ifndef CELIX_PUBSUB_ZMQ_ADMIN_H
#define CELIX_PUBSUB_ZMQ_ADMIN_H
+#include <pubsub_admin.h>
#include "celix_api.h"
#include "log_helper.h"
+#include "pubsub_nanomsg_topic_receiver.h"
+#include "../../../shell/shell/include/command.h"
#define PUBSUB_NANOMSG_ADMIN_TYPE "zmq"
#define PUBSUB_NANOMSG_URL_KEY "zmq.url"
@@ -31,38 +34,95 @@
#define PUBSUB_NANOMSG_PSA_IP_KEY "PSA_IP"
#define PUBSUB_NANOMSG_PSA_ITF_KEY "PSA_INTERFACE"
-#define PUBSUB_NANOMSG_NR_THREADS_KEY "PSA_ZMQ_NR_THREADS"
#define PUBSUB_NANOMSG_DEFAULT_IP "127.0.0.1"
-typedef struct pubsub_nanomsg_admin pubsub_nanomsg_admin_t;
-
-pubsub_nanomsg_admin_t* pubsub_nanoMsgAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
-void pubsub_nanoMsgAdmin_destroy(pubsub_nanomsg_admin_t *psa);
+//typedef struct pubsub_nanomsg_admin pubsub_nanomsg_admin_t;
+class pubsub_nanomsg_admin {
+public:
+ pubsub_nanomsg_admin(celix_bundle_context_t *ctx, log_helper_t *logHelper);
+ pubsub_nanomsg_admin(const pubsub_nanomsg_admin&) = delete;
+ pubsub_nanomsg_admin& operator=(const pubsub_nanomsg_admin&) = delete;
+ ~pubsub_nanomsg_admin();
+ void start();
+ void stop();
+
+private:
+ void addSerializerSvc(void *svc, const celix_properties_t *props);
+ void removeSerializerSvc(void */*svc*/, const celix_properties_t *props);
+ celix_status_t matchPublisher(long svcRequesterBndId, const celix_filter_t *svcFilter,
+ double *score, long *serializerSvcId);
+ celix_status_t matchSubscriber(long svcProviderBndId,
+ const celix_properties_t *svcProperties, double *score,
+ long *serializerSvcId);
+ celix_status_t matchEndpoint(const celix_properties_t *endpoint, bool *match);
+
+ celix_status_t setupTopicSender(const char *scope, const char *topic,
+ long serializerSvcId, celix_properties_t **publisherEndpoint);
+ celix_status_t teardownTopicSender(const char *scope, const char *topic);
+
+ celix_status_t setupTopicReceiver(const char *scope, const char *topic,
+ long serializerSvcId, celix_properties_t **subscriberEndpoint);
+ celix_status_t teardownTopicReceiver(const char *scope, const char *topic);
+
+ celix_status_t addEndpoint(const celix_properties_t *endpoint);
+ celix_status_t removeEndpoint(const celix_properties_t *endpoint);
+
+ celix_status_t executeCommand(char *commandLine __attribute__((unused)), FILE *out,
+ FILE *errStream __attribute__((unused)));
+
+ celix_status_t connectEndpointToReceiver(pubsub_nanomsg_topic_receiver_t *receiver,
+ const celix_properties_t *endpoint);
+
+ celix_status_t disconnectEndpointFromReceiver(pubsub_nanomsg_topic_receiver_t *receiver,
+ const celix_properties_t *endpoint);
+ celix_bundle_context_t *ctx;
+ log_helper_t *log;
+ pubsub_admin_service_t adminService{};
+ long adminSvcId = -1L;
+ long cmdSvcId = -1L;
+ command_service_t cmdSvc{};
+ long serializersTrackerId = -1L;
+
+ const char *fwUUID{};
+
+ char* ipAddress{};
+
+ unsigned int basePort{};
+ unsigned int maxPort{};
+
+ double qosSampleScore{};
+ double qosControlScore{};
+ double defaultScore{};
+
+ bool verbose{};
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = svcId, value = psa_nanomsg_serializer_entry_t*
+ } serializers{};
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t*
+ } topicSenders{};
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t*
+ } topicReceivers{};
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
+ } discoveredEndpoints{};
+
+};
#ifdef __cplusplus
extern "C" {
#endif
-celix_status_t pubsub_nanoMsgAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter,
- double *score, long *serializerSvcId);
- celix_status_t pubsub_nanoMsgAdmin_matchSubscriber(void *handle, long svcProviderBndId,
- const celix_properties_t *svcProperties, double *score,
- long *serializerSvcId);
-celix_status_t pubsub_nanoMsgAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
-
-celix_status_t pubsub_nanoMsgAdmin_setupTopicSender(void *handle, const char *scope, const char *topic,
- long serializerSvcId, celix_properties_t **publisherEndpoint);
-celix_status_t pubsub_nanoMsgAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
-
-celix_status_t pubsub_nanoMsgAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic,
- long serializerSvcId, celix_properties_t **subscriberEndpoint);
-celix_status_t pubsub_nanoMsgAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
-
-celix_status_t pubsub_nanoMsgAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint);
-celix_status_t pubsub_nanoMsgAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint);
-
-void pubsub_nanoMsgAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
-void pubsub_nanoMsgAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
+
#ifdef __cplusplus
}
#endif
http://git-wip-us.apache.org/repos/asf/celix/blob/cb740b0d/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index 646a80e..42f6423 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -373,6 +373,10 @@ static inline void processMsg(pubsub_nanomsg_topic_receiver_t *receiver, const p
}
}
+struct Message {
+ pubsub_nanmosg_msg_header_t header;
+ char payload[];
+};
static void* psa_nanomsg_recvThread(void *data) {
pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(data);
bool running{};
@@ -381,30 +385,25 @@ static void* psa_nanomsg_recvThread(void *data) {
running = receiver->recvThread.running;
}
while (running) {
- void * payload = nullptr;
+ Message *msg = nullptr;
nn_iovec iov[2];
- iov[0].iov_base = &payload;
+ iov[0].iov_base = &msg;
iov[0].iov_len = NN_MSG;
nn_msghdr msgHdr;
memset(&msgHdr, 0, sizeof(msgHdr));
msgHdr.msg_iov = iov;
- msgHdr.msg_iovlen = 1;//2;
+ msgHdr.msg_iovlen = 1;
msgHdr.msg_control = nullptr;
msgHdr.msg_controllen = 0;
errno = 0;
int recvBytes = nn_recvmsg(receiver->nanoMsgSocket, &msgHdr, 0);
- if (payload && static_cast<unsigned long>(recvBytes) >= sizeof(pubsub_nanmosg_msg_header_t)) {
- pubsub_nanmosg_msg_header_t *header = static_cast<pubsub_nanmosg_msg_header_t*>(payload);
- void* msg = ((char*)payload) + sizeof(*header);
- printf("HEADER: Type %d, major %d, minor %d\n", header->type , (int)header->major, (int)header->minor);
- fprintf(stderr, "RECEIVED %d bytes\n", recvBytes);
- processMsg(receiver, header, (char*)msg, recvBytes-sizeof(header));
-
- nn_freemsg(payload);
+ if (msg && static_cast<unsigned long>(recvBytes) >= sizeof(pubsub_nanmosg_msg_header_t)) {
+ processMsg(receiver, &msg->header, msg->payload, recvBytes-sizeof(msg->header));
+ nn_freemsg(msg);
} else if (recvBytes >= 0) {
L_ERROR("[PSA_ZMQ_TR] Error receiving nanmosg msg, size (%d) smaller than header\n", recvBytes);
} else if (errno == EAGAIN || errno == ETIMEDOUT) {
http://git-wip-us.apache.org/repos/asf/celix/blob/cb740b0d/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
index feead76..ff0d4f7 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
@@ -122,7 +122,6 @@ pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_con
while(rv == -1 && retry < NANOMSG_BIND_MAX_RETRY ) {
/* Randomized part due to same bundle publishing on different topics */
unsigned int port = rand_range(basePort,maxPort);
-
size_t len = (size_t)snprintf(NULL, 0, "tcp://%s:%u", bindIP, port) + 1;
char *url = static_cast<char*>(calloc(len, sizeof(char*)));
snprintf(url, len, "tcp://%s:%u", bindIP, port);
@@ -130,7 +129,6 @@ pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_con
len = (size_t)snprintf(NULL, 0, "tcp://0.0.0.0:%u", port) + 1;
char *bindUrl = static_cast<char*>(calloc(len, sizeof(char)));
snprintf(bindUrl, len, "tcp://0.0.0.0:%u", port);
-
rv = nn_bind (socket, bindUrl);
if (rv == -1) {
perror("Error for nn_bind");
http://git-wip-us.apache.org/repos/asf/celix/blob/cb740b0d/libs/framework/include/celix_bundle_activator.h
----------------------------------------------------------------------
diff --git a/libs/framework/include/celix_bundle_activator.h b/libs/framework/include/celix_bundle_activator.h
index 75e34d2..9bb2f6a 100644
--- a/libs/framework/include/celix_bundle_activator.h
+++ b/libs/framework/include/celix_bundle_activator.h
@@ -115,7 +115,7 @@ celix_status_t celix_bundleActivator_destroy(void *userData, celix_bundle_contex
#define CELIX_GEN_BUNDLE_ACTIVATOR(actType, actStart, actStop) \
celix_status_t celix_bundleActivator_create(celix_bundle_context_t *ctx __attribute__((unused)), void **userData) { \
celix_status_t status = CELIX_SUCCESS; \
- actType *data = (actType*)calloc(1, sizeof(*data)); \
+ actType *data = (actType*)calloc(1, sizeof(*data)); \
if (data != NULL) { \
*userData = data; \
} else { \