You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by er...@apache.org on 2018/10/24 14:29:04 UTC
[2/2] celix git commit: NanoMsgAdmin: first version
NanoMsgAdmin: first version
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/4fc1f3d3
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/4fc1f3d3
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/4fc1f3d3
Branch: refs/heads/nanomsg
Commit: 4fc1f3d3f7398a4b1f0e3993ed889b811f653d95
Parents: 1ffdd94
Author: Erjan Altena <er...@gmail.com>
Authored: Wed Oct 24 14:15:02 2018 +0200
Committer: Erjan Altena <er...@gmail.com>
Committed: Wed Oct 24 14:30:16 2018 +0200
----------------------------------------------------------------------
.../log_service/loghelper_include/log_helper.h | 7 +-
bundles/pubsub/CMakeLists.txt | 2 +
bundles/pubsub/examples/CMakeLists.txt | 88 ++-
.../pubsub/pubsub_admin_nanomsg/CMakeLists.txt | 51 ++
.../pubsub_admin_nanomsg/src/nanomsg_crypto.cc | 281 ++++++++
.../pubsub_admin_nanomsg/src/nanomsg_crypto.h | 41 ++
.../src/psa_nanomsg_activator.cc | 113 +++
.../src/pubsub_nanomsg_admin.cc | 698 +++++++++++++++++++
.../src/pubsub_nanomsg_admin.h | 73 ++
.../src/pubsub_nanomsg_common.cc | 56 ++
.../src/pubsub_nanomsg_common.h | 56 ++
.../src/pubsub_nanomsg_topic_receiver.cc | 420 +++++++++++
.../src/pubsub_nanomsg_topic_receiver.h | 45 ++
.../src/pubsub_nanomsg_topic_sender.cc | 368 ++++++++++
.../src/pubsub_nanomsg_topic_sender.h | 42 ++
.../src/pubsub_psa_nanomsg_constants.h | 50 ++
.../src/pubsub_udpmc_admin.c | 2 +-
.../pubsub/pubsub_spi/include/pubsub_endpoint.h | 23 +-
.../pubsub/pubsub_spi/include/pubsub_utils.h | 50 +-
cmake/Modules/FindNanoMsg.cmake | 42 ++
libs/framework/include/celix_bundle_activator.h | 2 +-
21 files changed, 2471 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/log_service/loghelper_include/log_helper.h
----------------------------------------------------------------------
diff --git a/bundles/log_service/loghelper_include/log_helper.h b/bundles/log_service/loghelper_include/log_helper.h
index 2ae9d83..28e6877 100644
--- a/bundles/log_service/loghelper_include/log_helper.h
+++ b/bundles/log_service/loghelper_include/log_helper.h
@@ -23,7 +23,9 @@
#include "bundle_context.h"
#include "log_service.h"
-
+#ifdef __cplusplus
+extern "C" {
+#endif
typedef struct log_helper log_helper_t;
typedef struct log_helper* log_helper_pt;
@@ -32,5 +34,8 @@ celix_status_t logHelper_start(log_helper_pt loghelper);
celix_status_t logHelper_stop(log_helper_pt loghelper);
celix_status_t logHelper_destroy(log_helper_pt* loghelper);
celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* message, ... );
+#ifdef __cplusplus
+}
+#endif
#endif /* LOGHELPER_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index e3db995..05e6ee1 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -24,12 +24,14 @@ if (PUBSUB)
option(BUILD_ZMQ_SECURITY "Build with security for ZeroMQ." OFF)
endif (BUILD_PUBSUB_PSA_ZMQ)
+ option(BUILD_PUBSUB_PSA_NANOMSG "Build NanoMsg PubSub Admin - Experimental" OFF)
add_subdirectory(pubsub_api)
add_subdirectory(pubsub_spi)
add_subdirectory(pubsub_topology_manager)
add_subdirectory(pubsub_discovery)
add_subdirectory(pubsub_serializer_json)
add_subdirectory(pubsub_admin_zmq)
+ add_subdirectory(pubsub_admin_nanomsg)
add_subdirectory(pubsub_admin_udp_mc)
add_subdirectory(keygen)
add_subdirectory(mock)
http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/CMakeLists.txt b/bundles/pubsub/examples/CMakeLists.txt
index e8113b9..126db2d 100644
--- a/bundles/pubsub/examples/CMakeLists.txt
+++ b/bundles/pubsub/examples/CMakeLists.txt
@@ -25,7 +25,7 @@ find_package(ZMQ REQUIRED)
find_package(CZMQ REQUIRED)
find_package(Jansson REQUIRED)
-set(PUBSUB_CONTAINER_LIBS ${JANSSON_LIBRARY} ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY} Celix::dfi)
+set(PUBSUB_CONTAINER_LIBS ${JANSSON_LIBRARY} ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} ${NANOMSG_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY} Celix::dfi)
# UDP Multicast
add_celix_container(pubsub_publisher_udp_mc
@@ -264,5 +264,91 @@ if (BUILD_PUBSUB_PSA_ZMQ)
USE_TERM
)
endif ()
+endif()
+if (BUILD_PUBSUB_PSA_NANOMSG)
+ add_celix_container("pubsub_publisher1_nanomsg"
+ GROUP "pubsub"
+ BUNDLES
+ Celix::shell
+ Celix::shell_tui
+ Celix::pubsub_serializer_json
+ Celix::pubsub_discovery_etcd
+ Celix::pubsub_topology_manager
+ Celix::pubsub_admin_nanomsg
+ celix_pubsub_poi_publisher
+ PROPERTIES
+ PSA_NANOMSG_VERBOSE=true
+ PUBSUB_ETCD_DISCOVERY_VERBOSE=true
+ PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
+ )
+ target_link_libraries(pubsub_publisher1_nanomsg PRIVATE ${PUBSUB_CONTAINER_LIBS})
+
+ add_celix_container("pubsub_publisher2_nanomsg"
+ GROUP "pubsub"
+ BUNDLES
+ Celix::shell
+ Celix::shell_tui
+ Celix::pubsub_serializer_json
+ Celix::pubsub_discovery_etcd
+ Celix::pubsub_topology_manager
+ Celix::pubsub_admin_nanomsg
+ celix_pubsub_poi_publisher
+ PROPERTIES
+ PSA_NANOMSG_VERBOSE=true
+ PUBSUB_ETCD_DISCOVERY_VERBOSE=true
+ PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
+ )
+ target_link_libraries(pubsub_publisher2_nanomsg PRIVATE ${PUBSUB_CONTAINER_LIBS})
+
+ add_celix_container(pubsub_subscriber1_nanomsg
+ GROUP "pubsub"
+ BUNDLES
+ Celix::shell
+ Celix::shell_tui
+ Celix::pubsub_serializer_json
+ Celix::pubsub_discovery_etcd
+ Celix::pubsub_topology_manager
+ Celix::pubsub_admin_nanomsg
+ celix_pubsub_poi_subscriber
+ PROPERTIES
+ PSA_NANOMSG_VERBOSE=true
+ PUBSUB_ETCD_DISCOVERY_VERBOSE=true
+ PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
+ )
+ target_link_libraries(pubsub_subscriber1_nanomsg PRIVATE ${PUBSUB_CONTAINER_LIBS})
+
+ add_celix_container(pubsub_subscriber2_nanomsg
+ GROUP "pubsub"
+ BUNDLES
+ Celix::shell
+ Celix::shell_tui
+ Celix::pubsub_serializer_json
+ Celix::pubsub_discovery_etcd
+ Celix::pubsub_topology_manager
+ Celix::pubsub_admin_nanomsg
+ celix_pubsub_poi_subscriber
+ PROPERTIES
+ PSA_NANOMSG_VERBOSE=true
+ PUBSUB_ETCD_DISCOVERY_VERBOSE=true
+ PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
+ )
+ target_link_libraries(pubsub_subscriber2_nanomsg PRIVATE ${PUBSUB_CONTAINER_LIBS})
+
+
+ if (ETCD_CMD AND XTERM_CMD)
+ #Runtime starting a publish and 2 subscribers for zmq
+ add_celix_runtime(pubsub_rt_nanomsg
+ NAME zmq
+ GROUP pubsub
+ CONTAINERS
+ pubsub_publisher1_nanomsg
+ pubsub_publisher2_nanomsg
+ pubsub_subscriber1_nanomsg
+ pubsub_subscriber2_nanomsg
+ COMMANDS
+ etcd
+ USE_TERM
+ )
+ endif ()
endif()
http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_admin_nanomsg/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/CMakeLists.txt b/bundles/pubsub/pubsub_admin_nanomsg/CMakeLists.txt
new file mode 100644
index 0000000..ab9806e
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_nanomsg/CMakeLists.txt
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if (BUILD_PUBSUB_PSA_NANOMSG)
+
+ find_package(NanoMsg REQUIRED)
+ find_package(Jansson REQUIRED)
+
+ add_celix_bundle(celix_pubsub_admin_nanomsg
+ BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_nanomsg"
+ VERSION "1.0.0"
+ GROUP "Celix/PubSub"
+ SOURCES
+ src/psa_nanomsg_activator.cc
+ src/pubsub_nanomsg_admin.cc
+ src/pubsub_nanomsg_topic_sender.cc
+ src/pubsub_nanomsg_topic_receiver.cc
+ src/pubsub_nanomsg_common.cc
+ )
+
+ set_target_properties(celix_pubsub_admin_nanomsg PROPERTIES INSTALL_RPATH "$ORIGIN")
+ target_link_libraries(celix_pubsub_admin_nanomsg PRIVATE
+ Celix::pubsub_spi
+ Celix::framework Celix::dfi Celix::log_helper
+ ${NANOMSG_LIBRARIES}
+ )
+ target_include_directories(celix_pubsub_admin_nanomsg PRIVATE
+ ${NANOMSG_INCLUDE_DIR}
+ ${JANSSON_INCLUDE_DIR}
+ src
+ ../pubsub_topology_manager/src
+ )
+
+ install_celix_bundle(celix_pubsub_admin_nanomsg EXPORT celix COMPONENT pubsub)
+ target_link_libraries(celix_pubsub_admin_nanomsg PRIVATE Celix::shell_api)
+ add_library(Celix::pubsub_admin_nanomsg ALIAS celix_pubsub_admin_nanomsg)
+endif()
http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.cc
new file mode 100644
index 0000000..d7d88bb
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.cc
@@ -0,0 +1,281 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * zmq_crypto.c
+ *
+ * \date Dec 2, 2016
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#include "nanomsg_crypto.h"
+
+#include <zmq.h>
+#include <openssl/conf.h>
+#include <openssl/evp.h>
+#include <openssl/err.h>
+
+#include <string.h>
+
+#define MAX_FILE_PATH_LENGTH 512
+#define ZMQ_KEY_LENGTH 40
+#define AES_KEY_LENGTH 32
+#define AES_IV_LENGTH 16
+
+#define KEY_TO_GET "aes_key"
+#define IV_TO_GET "aes_iv"
+
+static char* read_file_content(char* filePath, char* fileName);
+static void parse_key_lines(char *keysBuffer, char **key, char **iv);
+static void parse_key_line(char *line, char **key, char **iv);
+static void extract_keys_from_buffer(unsigned char *input, int inputlen, char **publicKey, char **secretKey);
+
+/**
+ * Return a valid zcert_t from an encoded file
+ * Caller is responsible for freeing by calling zcert_destroy(zcert** cert);
+ */
+zcert_t* get_zcert_from_encoded_file(char* keysFilePath, char* keysFileName, char* file_path)
+{
+
+ if (keysFilePath == NULL){
+ keysFilePath = DEFAULT_KEYS_FILE_PATH;
+ }
+
+ if (keysFileName == NULL){
+ keysFileName = DEFAULT_KEYS_FILE_NAME;
+ }
+
+ char* keys_data = read_file_content(keysFilePath, keysFileName);
+ if (keys_data == NULL){
+ return NULL;
+ }
+
+ char *key = NULL;
+ char *iv = NULL;
+ parse_key_lines(keys_data, &key, &iv);
+ free(keys_data);
+
+ if (key == NULL || iv == NULL){
+ free(key);
+ free(iv);
+
+ printf("CRYPTO: Loading AES key and/or AES iv failed!\n");
+ return NULL;
+ }
+
+ //At this point, we know an aes key and iv are stored and loaded
+
+ // generate sha256 hashes
+ unsigned char key_digest[EVP_MAX_MD_SIZE];
+ unsigned char iv_digest[EVP_MAX_MD_SIZE];
+ generate_sha256_hash((char*) key, key_digest);
+ generate_sha256_hash((char*) iv, iv_digest);
+
+ zchunk_t* encoded_secret = zchunk_slurp (file_path, 0);
+ if (encoded_secret == NULL){
+ free(key);
+ free(iv);
+
+ return NULL;
+ }
+
+ int encoded_secret_size = (int) zchunk_size (encoded_secret);
+ char* encoded_secret_data = zchunk_strdup(encoded_secret);
+ zchunk_destroy (&encoded_secret);
+
+ // Decryption of data
+ int decryptedtext_len;
+ unsigned char decryptedtext[encoded_secret_size];
+ decryptedtext_len = decrypt((unsigned char *) encoded_secret_data, encoded_secret_size, key_digest, iv_digest, decryptedtext);
+ decryptedtext[decryptedtext_len] = '\0';
+
+ EVP_cleanup();
+
+ free(encoded_secret_data);
+ free(key);
+ free(iv);
+
+ // The public and private keys are retrieved
+ char* public_text = NULL;
+ char* secret_text = NULL;
+
+ extract_keys_from_buffer(decryptedtext, decryptedtext_len, &public_text, &secret_text);
+
+ byte public_key [32] = { 0 };
+ byte secret_key [32] = { 0 };
+
+ zmq_z85_decode (public_key, public_text);
+ zmq_z85_decode (secret_key, secret_text);
+
+ zcert_t* cert_loaded = zcert_new_from(public_key, secret_key);
+
+ free(public_text);
+ free(secret_text);
+
+ return cert_loaded;
+}
+
+int generate_sha256_hash(char* text, unsigned char* digest)
+{
+ unsigned int digest_len;
+
+ EVP_MD_CTX * mdctx = EVP_MD_CTX_new();
+ EVP_DigestInit_ex(mdctx, EVP_sha256(), NULL);
+ EVP_DigestUpdate(mdctx, text, strlen(text));
+ EVP_DigestFinal_ex(mdctx, digest, &digest_len);
+ EVP_MD_CTX_free(mdctx);
+
+ return digest_len;
+}
+
+int decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext)
+{
+ int len;
+ int plaintext_len;
+
+ EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new();
+
+ EVP_DecryptInit_ex(ctx, EVP_aes_256_cbc(), NULL, key, iv);
+ EVP_DecryptUpdate(ctx, plaintext, &len, ciphertext, ciphertext_len);
+ plaintext_len = len;
+ EVP_DecryptFinal_ex(ctx, plaintext + len, &len);
+ plaintext_len += len;
+
+ EVP_CIPHER_CTX_free(ctx);
+
+ return plaintext_len;
+}
+
+/**
+ * Caller is responsible for freeing the returned value
+ */
+static char* read_file_content(char* filePath, char* fileName){
+
+ char fileNameWithPath[MAX_FILE_PATH_LENGTH];
+ snprintf(fileNameWithPath, MAX_FILE_PATH_LENGTH, "%s/%s", filePath, fileName);
+ int rc = 0;
+
+ if (!zsys_file_exists(fileNameWithPath)){
+ printf("CRYPTO: Keys file '%s' doesn't exist!\n", fileNameWithPath);
+ return NULL;
+ }
+
+ zfile_t* keys_file = zfile_new (filePath, fileName);
+ rc = zfile_input (keys_file);
+ if (rc != 0){
+ zfile_destroy(&keys_file);
+ printf("CRYPTO: Keys file '%s' not readable!\n", fileNameWithPath);
+ return NULL;
+ }
+
+ ssize_t keys_file_size = zsys_file_size (fileNameWithPath);
+ zchunk_t* keys_chunk = zfile_read (keys_file, keys_file_size, 0);
+ if (keys_chunk == NULL){
+ zfile_close(keys_file);
+ zfile_destroy(&keys_file);
+ printf("CRYPTO: Can't read file '%s'!\n", fileNameWithPath);
+ return NULL;
+ }
+
+ char* keys_data = zchunk_strdup(keys_chunk);
+ zchunk_destroy(&keys_chunk);
+ zfile_close(keys_file);
+ zfile_destroy (&keys_file);
+
+ return keys_data;
+}
+
+static void parse_key_lines(char *keysBuffer, char **key, char **iv){
+ char *line = NULL, *saveLinePointer = NULL;
+
+ bool firstTime = true;
+ do {
+ if (firstTime){
+ line = strtok_r(keysBuffer, "\n", &saveLinePointer);
+ firstTime = false;
+ }else {
+ line = strtok_r(NULL, "\n", &saveLinePointer);
+ }
+
+ if (line == NULL){
+ break;
+ }
+
+ parse_key_line(line, key, iv);
+
+ } while((*key == NULL || *iv == NULL) && line != NULL);
+
+}
+
+static void parse_key_line(char *line, char **key, char **iv){
+ char *detectedKey = NULL, *detectedValue= NULL;
+
+ char* sep_at = strchr(line, ':');
+ if (sep_at == NULL){
+ return;
+ }
+
+ *sep_at = '\0'; // overwrite first separator, creating two strings.
+ detectedKey = line;
+ detectedValue = sep_at + 1;
+
+ if (detectedKey == NULL || detectedValue == NULL){
+ return;
+ }
+ if (detectedKey[0] == '\0' || detectedValue[0] == '\0'){
+ return;
+ }
+
+ if (*key == NULL && strcmp(detectedKey, KEY_TO_GET) == 0){
+ *key = strndup(detectedValue, AES_KEY_LENGTH);
+ } else if (*iv == NULL && strcmp(detectedKey, IV_TO_GET) == 0){
+ *iv = strndup(detectedValue, AES_IV_LENGTH);
+ }
+}
+
+static void extract_keys_from_buffer(unsigned char *input, int inputlen, char **publicKey, char **secretKey) {
+ // Load decrypted text buffer
+ zchunk_t* secret_decrypted = zchunk_new(input, inputlen);
+ if (secret_decrypted == NULL){
+ printf("CRYPTO: Failed to create zchunk\n");
+ return;
+ }
+
+ zconfig_t* secret_config = zconfig_chunk_load (secret_decrypted);
+ zchunk_destroy (&secret_decrypted);
+ if (secret_config == NULL){
+ printf("CRYPTO: Failed to create zconfig\n");
+ return;
+ }
+
+ // Extract public and secret key from text buffer
+ char* public_text = zconfig_get (secret_config, "/curve/public-key", NULL);
+ char* secret_text = zconfig_get (secret_config, "/curve/secret-key", NULL);
+
+ if (public_text == NULL || secret_text == NULL){
+ zconfig_destroy(&secret_config);
+ printf("CRYPTO: Loading public / secret key from text-buffer failed!\n");
+ return;
+ }
+
+ *publicKey = strndup(public_text, ZMQ_KEY_LENGTH + 1);
+ *secretKey = strndup(secret_text, ZMQ_KEY_LENGTH + 1);
+
+ zconfig_destroy(&secret_config);
+}
http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.h b/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.h
new file mode 100644
index 0000000..f1a990f
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/nanomsg_crypto.h
@@ -0,0 +1,41 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * zmq_crypto.h
+ *
+ * \date Dec 2, 2016
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#ifndef ZMQ_CRYPTO_H_
+#define ZMQ_CRYPTO_H_
+
+#include <czmq.h>
+
+#define PROPERTY_KEYS_FILE_PATH "keys.file.path"
+#define PROPERTY_KEYS_FILE_NAME "keys.file.name"
+#define DEFAULT_KEYS_FILE_PATH "/etc/"
+#define DEFAULT_KEYS_FILE_NAME "pubsub.keys"
+
+zcert_t* get_zcert_from_encoded_file(char* keysFilePath, char* keysFileName, char* file_path);
+int generate_sha256_hash(char* text, unsigned char* digest);
+int decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext);
+
+#endif
http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
new file mode 100644
index 0000000..79ea1d4
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
@@ -0,0 +1,113 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#include <stdlib.h>
+
+#include "celix_api.h"
+#include "pubsub_serializer.h"
+#include "log_helper.h"
+
+#include "pubsub_admin.h"
+#include "pubsub_nanomsg_admin.h"
+#include "../../../shell/shell/include/command.h"
+
+typedef struct psa_nanomsg_activator {
+ log_helper_t *logHelper;
+
+ pubsub_nanomsg_admin_t *admin;
+
+ long serializersTrackerId;
+
+ pubsub_admin_service_t adminService;
+ long adminSvcId;
+
+ command_service_t cmdSvc;
+ long cmdSvcId;
+} psa_nanomsg_activator_t;
+
+int psa_nanomsg_start(psa_nanomsg_activator_t *act, celix_bundle_context_t *ctx) {
+ act->adminSvcId = -1L;
+ act->cmdSvcId = -1L;
+ act->serializersTrackerId = -1L;
+
+ logHelper_create(ctx, &act->logHelper);
+ logHelper_start(act->logHelper);
+
+ act->admin = pubsub_nanoMsgAdmin_create(ctx, act->logHelper);
+ celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
+
+ //track serializers
+ if (status == CELIX_SUCCESS) {
+ celix_service_tracking_options_t opts{};
+ opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
+ opts.filter.ignoreServiceLanguage = true;
+ opts.callbackHandle = act->admin;
+ opts.addWithProperties = pubsub_nanoMsgAdmin_addSerializerSvc;
+ opts.removeWithProperties = pubsub_nanoMsgAdmin_removeSerializerSvc;
+ act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+ }
+
+ //register pubsub admin service
+ if (status == CELIX_SUCCESS) {
+ pubsub_admin_service_t *psaSvc = &act->adminService;
+ psaSvc->handle = act->admin;
+ psaSvc->matchPublisher = pubsub_nanoMsgAdmin_matchPublisher;
+ psaSvc->matchSubscriber = pubsub_nanoMsgAdmin_matchSubscriber;
+ psaSvc->matchEndpoint = pubsub_nanoMsgAdmin_matchEndpoint;
+ psaSvc->setupTopicSender = pubsub_nanoMsgAdmin_setupTopicSender;
+ psaSvc->teardownTopicSender = pubsub_nanoMsgAdmin_teardownTopicSender;
+ psaSvc->setupTopicReceiver = pubsub_nanoMsgAdmin_setupTopicReceiver;
+ psaSvc->teardownTopicReceiver = pubsub_nanoMsgAdmin_teardownTopicReceiver;
+ psaSvc->addEndpoint = pubsub_nanoMsgAdmin_addEndpoint;
+ psaSvc->removeEndpoint = pubsub_nanoMsgAdmin_removeEndpoint;
+
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_NANOMSG_ADMIN_TYPE);
+
+ act->adminSvcId = celix_bundleContext_registerService(ctx, psaSvc, PUBSUB_ADMIN_SERVICE_NAME, props);
+ }
+
+ //register shell command service
+ {
+ act->cmdSvc.handle = act->admin;
+ act->cmdSvc.executeCommand = pubsub_nanoMsgAdmin_executeCommand;
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, OSGI_SHELL_COMMAND_NAME, "psa_nanomsg");
+ celix_properties_set(props, OSGI_SHELL_COMMAND_USAGE, "psa_nanomsg");
+ celix_properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the ZMQ PSA");
+ act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props);
+ }
+
+ return status;
+}
+
+int psa_nanomsg_stop(psa_nanomsg_activator_t *act, celix_bundle_context_t *ctx) {
+ celix_bundleContext_unregisterService(ctx, act->adminSvcId);
+ celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
+ celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
+ pubsub_nanoMsgAdmin_destroy(act->admin);
+
+ logHelper_stop(act->logHelper);
+ logHelper_destroy(&act->logHelper);
+
+ return CELIX_SUCCESS;
+}
+
+CELIX_GEN_BUNDLE_ACTIVATOR(psa_nanomsg_activator_t, psa_nanomsg_start, psa_nanomsg_stop);
http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
new file mode 100644
index 0000000..f3e6831
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -0,0 +1,698 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <string>
+#include <vector>
+#include <memory.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <ifaddrs.h>
+#include <pubsub_endpoint.h>
+#include <pubsub_serializer.h>
+
+#include "pubsub_utils.h"
+#include "pubsub_nanomsg_admin.h"
+#include "pubsub_psa_nanomsg_constants.h"
+#include "pubsub_nanomsg_topic_sender.h"
+#include "pubsub_nanomsg_topic_receiver.h"
+/*
+//#define L_DEBUG(...) \
+// logHelper_log(psa->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+//#define L_INFO(...) \
+// logHelper_log(psa->log, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+//#define L_WARN(...) \
+// logHelper_log(psa->log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+//#define L_ERROR(...) \
+// logHelper_log(psa->log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+*/
+#define L_DEBUG printf
+#define L_INFO printf
+#define L_WARN printf
+#define L_ERROR printf
+
+struct pubsub_nanomsg_admin {
+ celix_bundle_context_t *ctx;
+ log_helper_t *log;
+ const char *fwUUID;
+
+ char* ipAddress;
+
+ unsigned int basePort;
+ unsigned int maxPort;
+
+ double qosSampleScore;
+ double qosControlScore;
+ double defaultScore;
+
+ bool verbose;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = svcId, value = psa_nanomsg_serializer_entry_t*
+ } serializers;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t*
+ } topicSenders;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t*
+ } topicReceivers;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
+ } discoveredEndpoints;
+
+};
+
+typedef struct psa_nanomsg_serializer_entry {
+ const char *serType;
+ long svcId;
+ pubsub_serializer_service_t *svc;
+} psa_nanomsg_serializer_entry_t;
+
+static celix_status_t nanoMsg_getIpAddress(const char *interface, char **ip);
+static celix_status_t pubsub_nanoMsgAdmin_connectEndpointToReceiver(pubsub_nanomsg_admin_t *psa,
+ pubsub_nanomsg_topic_receiver_t *receiver,
+ const celix_properties_t *endpoint);
+static celix_status_t pubsub_nanoMsgAdmin_disconnectEndpointFromReceiver(pubsub_nanomsg_admin_t *psa,
+ pubsub_nanomsg_topic_receiver_t *receiver,
+ const celix_properties_t *endpoint);
+
+
+pubsub_nanomsg_admin_t* pubsub_nanoMsgAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper) {
+ pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(calloc(1, sizeof(*psa)));
+ psa->ctx = ctx;
+ psa->log = logHelper;
+ psa->verbose = celix_bundleContext_getPropertyAsBool(ctx, PUBSUB_NANOMSG_VERBOSE_KEY, PUBSUB_NANOMSG_VERBOSE_DEFAULT);
+ psa->fwUUID = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
+
+ char *ip = NULL;
+ const char *confIp = celix_bundleContext_getProperty(ctx, PUBSUB_NANOMSG_PSA_IP_KEY , NULL);
+ if (confIp != NULL) {
+ ip = strndup(confIp, 1024);
+ }
+
+ if (ip == NULL) {
+ //TODO try to get ip from subnet (CIDR)
+ }
+
+ if (ip == NULL) {
+ //try to get ip from itf
+ const char *interface = celix_bundleContext_getProperty(ctx, PUBSUB_NANOMSG_PSA_ITF_KEY, NULL);
+ nanoMsg_getIpAddress(interface, &ip);
+ }
+
+ if (ip == NULL) {
+ L_WARN("[PSA_NANOMSG] Could not determine IP address for PSA, using default ip (%s)", PUBSUB_NANOMSG_DEFAULT_IP);
+ ip = strndup(PUBSUB_NANOMSG_DEFAULT_IP, 1024);
+ }
+
+ psa->ipAddress = ip;
+ if (psa->verbose) {
+ L_INFO("[PSA_NANOMSG] Using %s for service annunciation", ip);
+ }
+
+
+ long basePort = celix_bundleContext_getPropertyAsLong(ctx, PSA_NANOMSG_BASE_PORT, PSA_NANOMSG_DEFAULT_BASE_PORT);
+ long maxPort = celix_bundleContext_getPropertyAsLong(ctx, PSA_NANOMSG_MAX_PORT, PSA_NANOMSG_DEFAULT_MAX_PORT);
+ psa->basePort = (unsigned int)basePort;
+ psa->maxPort = (unsigned int)maxPort;
+ if (psa->verbose) {
+ L_INFO("[PSA_NANOMSG] Using base till max port: %i till %i", psa->basePort, psa->maxPort);
+ }
+
+
+// long nrThreads = celix_bundleContext_getPropertyAsLong(ctx, PUBSUB_NANOMSG_NR_THREADS_KEY, 0);
+// if (nrThreads > 0) {
+// zsys_set_io_threads((size_t)nrThreads);
+// L_INFO("[PSA_NANOMSG] Using %d threads for NanoMsg", (size_t)nrThreads);
+// }
+
+
+ psa->defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_DEFAULT_SCORE_KEY, PSA_NANOMSG_DEFAULT_SCORE);
+ psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE);
+ psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_CONTROL_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE);
+
+ celixThreadMutex_create(&psa->serializers.mutex, NULL);
+ psa->serializers.map = hashMap_create(NULL, NULL, NULL, NULL);
+
+ celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
+ psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+ celixThreadMutex_create(&psa->topicReceivers.mutex, NULL);
+ psa->topicReceivers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+ celixThreadMutex_create(&psa->discoveredEndpoints.mutex, NULL);
+ psa->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+ return psa;
+}
+
+void pubsub_nanoMsgAdmin_destroy(pubsub_nanomsg_admin_t *psa) {
+ if (psa == NULL) {
+ return;
+ }
+
+ //note assuming al psa register services and service tracker are removed.
+
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapIterator_nextValue(&iter));
+ pubsub_nanoMsgTopicSender_destroy(sender);
+ }
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ iter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_nanomsg_topic_receiver_t *recv = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
+ pubsub_nanoMsgTopicReceiver_destroy(recv);
+ }
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+ celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+ iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ celix_properties_t *ep = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
+ celix_properties_destroy(ep);
+ }
+ celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+
+ celixThreadMutex_lock(&psa->serializers.mutex);
+ iter = hashMapIterator_construct(psa->serializers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_nanomsg_serializer_entry_t *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMapIterator_nextValue(&iter));
+ free(entry);
+ }
+ celixThreadMutex_unlock(&psa->serializers.mutex);
+
+ celixThreadMutex_destroy(&psa->topicSenders.mutex);
+ hashMap_destroy(psa->topicSenders.map, true, false);
+
+ celixThreadMutex_destroy(&psa->topicReceivers.mutex);
+ hashMap_destroy(psa->topicReceivers.map, true, false);
+
+ celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
+ hashMap_destroy(psa->discoveredEndpoints.map, false, false);
+
+ celixThreadMutex_destroy(&psa->serializers.mutex);
+ hashMap_destroy(psa->serializers.map, false, false);
+
+ free(psa->ipAddress);
+
+ free(psa);
+}
+
+void pubsub_nanoMsgAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
+ pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+
+ const char *serType = celix_properties_get(props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+
+ if (serType == NULL) {
+ L_INFO("[PSA_NANOMSG] Ignoring serializer service without %s property", PUBSUB_SERIALIZER_TYPE_KEY);
+ return;
+ }
+
+ celixThreadMutex_lock(&psa->serializers.mutex);
+ psa_nanomsg_serializer_entry_t *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(psa->serializers.map, (void*)svcId));
+ if (entry == NULL) {
+ entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(*entry)));
+ entry->serType = serType;
+ entry->svcId = svcId;
+ entry->svc = static_cast<pubsub_serializer_service_t*>(svc);
+ hashMap_put(psa->serializers.map, (void*)svcId, entry);
+ }
+ celixThreadMutex_unlock(&psa->serializers.mutex);
+}
+
+void pubsub_nanoMsgAdmin_removeSerializerSvc(void *handle, void */*svc*/, const celix_properties_t *props) {
+ pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+
+ //remove serializer
+ // 1) First find entry and
+ // 2) loop and destroy all topic sender using the serializer and
+ // 3) loop and destroy all topic receivers using the serializer
+ // Note that it is the responsibility of the topology manager to create new topic senders/receivers
+
+ celixThreadMutex_lock(&psa->serializers.mutex);
+ psa_nanomsg_serializer_entry_t *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_remove(psa->serializers.map, (void*)svcId));
+ if (entry != NULL) {
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+ pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapEntry_getValue(senderEntry));
+ if (sender != NULL && entry->svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) {
+ char *key = static_cast<char*>(hashMapEntry_getKey(senderEntry));
+ hashMapIterator_remove(&iter);
+ pubsub_nanoMsgTopicSender_destroy(sender);
+ free(key);
+ }
+ }
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ iter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+ pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(senderEntry));
+ if (receiver != NULL && entry->svcId == pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver)) {
+ char *key = static_cast<char*>(hashMapEntry_getKey(senderEntry));
+ hashMapIterator_remove(&iter);
+ pubsub_nanoMsgTopicReceiver_destroy(receiver);
+ free(key);
+ }
+ }
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+ free(entry);
+ }
+ celixThreadMutex_unlock(&psa->serializers.mutex);
+}
+
+celix_status_t pubsub_nanoMsgAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter,
+ double *outScore, long *outSerializerSvcId) {
+ pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+ L_DEBUG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchPublisher");
+ celix_status_t status = CELIX_SUCCESS;
+ double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_NANOMSG_ADMIN_TYPE,
+ psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+ *outScore = score;
+
+ return status;
+}
+
+celix_status_t pubsub_nanoMsgAdmin_matchSubscriber(void *handle, long svcProviderBndId,
+ const celix_properties_t *svcProperties, double *outScore,
+ long *outSerializerSvcId) {
+ pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+ L_DEBUG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchSubscriber");
+ celix_status_t status = CELIX_SUCCESS;
+ double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_NANOMSG_ADMIN_TYPE,
+ psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+ if (outScore != NULL) {
+ *outScore = score;
+ }
+ return status;
+}
+
+celix_status_t pubsub_nanoMsgAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *outMatch) {
+ pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+ L_DEBUG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchEndpoint");
+ celix_status_t status = CELIX_SUCCESS;
+ bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_NANOMSG_ADMIN_TYPE, NULL);
+ if (outMatch != NULL) {
+ *outMatch = match;
+ }
+ return status;
+}
+
+celix_status_t pubsub_nanoMsgAdmin_setupTopicSender(void *handle, const char *scope, const char *topic,
+ long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
+ pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+ celix_status_t status = CELIX_SUCCESS;
+
+ //1) Create TopicSender
+ //2) Store TopicSender
+ //3) Connect existing endpoints
+ //4) set outPublisherEndpoint
+
+ celix_properties_t *newEndpoint = NULL;
+
+ char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+
+ celixThreadMutex_lock(&psa->serializers.mutex);
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMap_get(psa->topicSenders.map, key));
+ if (sender == NULL) {
+ psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(psa->serializers.map, (void*)serializerSvcId));
+ if (serEntry != NULL) {
+ sender = pubsub_nanoMsgTopicSender_create(psa->ctx, psa->log, scope, topic, serializerSvcId, serEntry->svc,
+ psa->ipAddress, psa->basePort, psa->maxPort);
+ }
+ if (sender != NULL) {
+ const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
+ const char *serType = serEntry->serType;
+ newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
+ serType, NULL);
+ celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, pubsub_nanoMsgTopicSender_url(sender));
+ //if available also set container name
+ const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
+ if (cn != NULL) {
+ celix_properties_set(newEndpoint, "container_name", cn);
+ }
+ hashMap_put(psa->topicSenders.map, key, sender);
+ } else {
+ L_ERROR("[PSA NANOMSG] Error creating a TopicSender");
+ free(key);
+ }
+ } else {
+ free(key);
+ L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicSender for scope/topic %s/%s!", scope, topic);
+ }
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+ celixThreadMutex_unlock(&psa->serializers.mutex);
+
+ if (sender != NULL && newEndpoint != NULL) {
+ //TODO connect endpoints to sender, NOTE is this needed for a nanomsg topic sender?
+ }
+
+ if (newEndpoint != NULL && outPublisherEndpoint != NULL) {
+ *outPublisherEndpoint = newEndpoint;
+ }
+
+ return status;
+}
+
+celix_status_t pubsub_nanoMsgAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic) {
+ pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+ celix_status_t status = CELIX_SUCCESS;
+
+ //1) Find and remove TopicSender from map
+ //2) destroy topic sender
+
+ char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ hash_map_entry_t *entry = hashMap_getEntry(psa->topicSenders.map, key);
+ if (entry != NULL) {
+ char *mapKey = static_cast<char*>(hashMapEntry_getKey(entry));
+ pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMap_remove(psa->topicSenders.map, key));
+ free(mapKey);
+ //TODO disconnect endpoints to sender. note is this needed for a nanomsg topic sender?
+ pubsub_nanoMsgTopicSender_destroy(sender);
+ } else {
+ L_ERROR("[PSA NANOMSG] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic);
+ }
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+ free(key);
+
+ return status;
+}
+
+celix_status_t pubsub_nanoMsgAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic,
+ long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
+ pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+
+ celix_properties_t *newEndpoint = NULL;
+
+ char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+ celixThreadMutex_lock(&psa->serializers.mutex);
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMap_get(psa->topicReceivers.map, key));
+ if (receiver == NULL) {
+ psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(psa->serializers.map, (void*)serializerSvcId));
+ if (serEntry != NULL) {
+ receiver = pubsub_nanoMsgTopicReceiver_create(psa->ctx, psa->log, scope, topic, serializerSvcId,
+ serEntry->svc);
+ } else {
+ L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope, topic);
+ }
+ if (receiver != NULL) {
+ const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
+ const char *serType = serEntry->serType;
+ newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
+ PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL);
+ //if available also set container name
+ const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
+ if (cn != NULL) {
+ celix_properties_set(newEndpoint, "container_name", cn);
+ }
+ hashMap_put(psa->topicReceivers.map, key, receiver);
+ } else {
+ L_ERROR("[PSA NANOMSG] Error creating a TopicReceiver.");
+ free(key);
+ }
+ } else {
+ free(key);
+ L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic);
+ }
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+ celixThreadMutex_unlock(&psa->serializers.mutex);
+
+ if (receiver != NULL && newEndpoint != NULL) {
+ celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ celix_properties_t *endpoint = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
+ const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+ if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+ pubsub_nanoMsgAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+ }
+ }
+ celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+ }
+
+ if (newEndpoint != NULL && outSubscriberEndpoint != NULL) {
+ *outSubscriberEndpoint = newEndpoint;
+ }
+
+ celix_status_t status = CELIX_SUCCESS;
+ return status;
+}
+
+celix_status_t pubsub_nanoMsgAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic) {
+ pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+
+ char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ hash_map_entry_t *entry = hashMap_getEntry(psa->topicReceivers.map, key);
+ free(key);
+ if (entry != NULL) {
+ char *receiverKey = static_cast<char*>(hashMapEntry_getKey(entry));
+ pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(entry));
+ hashMap_remove(psa->topicReceivers.map, receiverKey);
+
+ free(receiverKey);
+ pubsub_nanoMsgTopicReceiver_destroy(receiver);
+ }
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+
+ celix_status_t status = CELIX_SUCCESS;
+ return status;
+}
+
+static celix_status_t pubsub_nanoMsgAdmin_connectEndpointToReceiver(pubsub_nanomsg_admin_t * /*psa*/,
+ pubsub_nanomsg_topic_receiver_t *receiver,
+ const celix_properties_t *endpoint) {
+ //note can be called with discoveredEndpoint.mutex lock
+ celix_status_t status = CELIX_SUCCESS;
+
+ const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
+ const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
+
+ const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+ const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+ const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, NULL);
+
+ if (url == NULL) {
+// const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
+// const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+// L_WARN("[PSA NANOMSG] Error got endpoint without a nanomsg url (admin: %s, type: %s)", admin , type);
+ status = CELIX_BUNDLE_EXCEPTION;
+ } else {
+ if (eScope != NULL && eTopic != NULL &&
+ strncmp(eScope, scope, 1024 * 1024) == 0 &&
+ strncmp(eTopic, topic, 1024 * 1024) == 0) {
+ pubsub_nanoMsgTopicReceiver_connectTo(receiver, url);
+ }
+ }
+
+ return status;
+}
+
+celix_status_t pubsub_nanoMsgAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint) {
+ pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+
+ const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+
+ if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
+ pubsub_nanoMsgAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+ }
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+ }
+
+ celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+ celix_properties_t *cpy = celix_properties_copy(endpoint);
+ const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, NULL);
+ hashMap_put(psa->discoveredEndpoints.map, (void*)uuid, cpy);
+ celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+
+ celix_status_t status = CELIX_SUCCESS;
+ return status;
+}
+
+
+static celix_status_t pubsub_nanoMsgAdmin_disconnectEndpointFromReceiver(pubsub_nanomsg_admin_t * /*psa*/,
+ pubsub_nanomsg_topic_receiver_t *receiver,
+ const celix_properties_t *endpoint) {
+ //note can be called with discoveredEndpoint.mutex lock
+ celix_status_t status = CELIX_SUCCESS;
+
+ const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
+ const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
+
+ const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+ const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+ const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, NULL);
+
+ if (url == NULL) {
+ L_WARN("[PSA NANOMSG] Error got endpoint without nanomsg url");
+ status = CELIX_BUNDLE_EXCEPTION;
+ } else {
+ if (eScope != NULL && eTopic != NULL &&
+ strncmp(eScope, scope, 1024 * 1024) == 0 &&
+ strncmp(eTopic, topic, 1024 * 1024) == 0) {
+ pubsub_nanoMsgTopicReceiver_disconnectFrom(receiver, url);
+ }
+ }
+
+ return status;
+}
+
+celix_status_t pubsub_nanoMsgAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint) {
+ pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+
+ const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+
+ if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
+ pubsub_nanoMsgAdmin_disconnectEndpointFromReceiver(psa, receiver, endpoint);
+ }
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+ }
+
+ celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+ const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
+ celix_properties_t *found = static_cast<celix_properties_t*>(hashMap_remove(psa->discoveredEndpoints.map, (void*)uuid));
+ celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+
+ if (found != NULL) {
+ celix_properties_destroy(found);
+ }
+
+ celix_status_t status = CELIX_SUCCESS;
+ return status;
+}
+
+celix_status_t pubsub_nanoMsgAdmin_executeCommand(void *handle, char *commandLine __attribute__((unused)), FILE *out,
+ FILE *errStream __attribute__((unused))) {
+ pubsub_nanomsg_admin_t *psa = static_cast<pubsub_nanomsg_admin_t*>(handle);
+ celix_status_t status = CELIX_SUCCESS;
+
+ fprintf(out, "\n");
+ fprintf(out, "Topic Senders:\n");
+ celixThreadMutex_lock(&psa->serializers.mutex);
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapIterator_nextValue(&iter));
+ long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender);
+ psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(psa->serializers.map, (void*)serSvcId));
+ const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
+ const char *scope = pubsub_nanoMsgTopicSender_scope(sender);
+ const char *topic = pubsub_nanoMsgTopicSender_topic(sender);
+ const char *url = pubsub_nanoMsgTopicSender_url(sender);
+ fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
+ fprintf(out, " |- serializer type = %s\n", serType);
+ fprintf(out, " |- url = %s\n", url);
+ }
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+ celixThreadMutex_unlock(&psa->serializers.mutex);
+
+ fprintf(out, "\n");
+ fprintf(out, "\nTopic Receivers:\n");
+ celixThreadMutex_lock(&psa->serializers.mutex);
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ iter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
+ long serSvcId = pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver);
+ psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(psa->serializers.map, (void*)serSvcId));
+ const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
+ const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
+ const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
+
+ std::vector<std::string> connected{};
+ std::vector<std::string> unconnected{};
+ pubsub_nanoMsgTopicReceiver_listConnections(receiver, connected, unconnected);
+
+ fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
+ fprintf(out, " |- serializer type = %s\n", serType);
+ for (auto url : connected) {
+ fprintf(out, " |- connected url = %s\n", url.c_str());
+ }
+ for (auto url : unconnected) {
+ fprintf(out, " |- unconnected url = %s\n", url.c_str());
+ }
+ }
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+ celixThreadMutex_unlock(&psa->serializers.mutex);
+ fprintf(out, "\n");
+
+ return status;
+}
+
+#ifndef ANDROID
+static celix_status_t nanoMsg_getIpAddress(const char *interface, char **ip) {
+ celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+
+ struct ifaddrs *ifaddr, *ifa;
+ char host[NI_MAXHOST];
+
+ if (getifaddrs(&ifaddr) != -1)
+ {
+ for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa = ifa->ifa_next)
+ {
+ if (ifa->ifa_addr == NULL)
+ continue;
+
+ if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) {
+ if (interface == NULL) {
+ *ip = strdup(host);
+ status = CELIX_SUCCESS;
+ }
+ else if (strcmp(ifa->ifa_name, interface) == 0) {
+ *ip = strdup(host);
+ status = CELIX_SUCCESS;
+ }
+ }
+ }
+
+ freeifaddrs(ifaddr);
+ }
+
+ return status;
+}
+#endif
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
new file mode 100644
index 0000000..195ccb7
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -0,0 +1,73 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef CELIX_PUBSUB_ZMQ_ADMIN_H
+#define CELIX_PUBSUB_ZMQ_ADMIN_H
+
+#include "celix_api.h"
+#include "log_helper.h"
+
+#define PUBSUB_NANOMSG_ADMIN_TYPE "zmq"
+#define PUBSUB_NANOMSG_URL_KEY "zmq.url"
+
+#define PUBSUB_NANOMSG_VERBOSE_KEY "PSA_ZMQ_VERBOSE"
+#define PUBSUB_NANOMSG_VERBOSE_DEFAULT true
+
+#define PUBSUB_NANOMSG_PSA_IP_KEY "PSA_IP"
+#define PUBSUB_NANOMSG_PSA_ITF_KEY "PSA_INTERFACE"
+#define PUBSUB_NANOMSG_NR_THREADS_KEY "PSA_ZMQ_NR_THREADS"
+
+#define PUBSUB_NANOMSG_DEFAULT_IP "127.0.0.1"
+
+typedef struct pubsub_nanomsg_admin pubsub_nanomsg_admin_t;
+
+pubsub_nanomsg_admin_t* pubsub_nanoMsgAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
+void pubsub_nanoMsgAdmin_destroy(pubsub_nanomsg_admin_t *psa);
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+celix_status_t pubsub_nanoMsgAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter,
+ double *score, long *serializerSvcId);
+ celix_status_t pubsub_nanoMsgAdmin_matchSubscriber(void *handle, long svcProviderBndId,
+ const celix_properties_t *svcProperties, double *score,
+ long *serializerSvcId);
+celix_status_t pubsub_nanoMsgAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
+
+celix_status_t pubsub_nanoMsgAdmin_setupTopicSender(void *handle, const char *scope, const char *topic,
+ long serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsub_nanoMsgAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
+
+celix_status_t pubsub_nanoMsgAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic,
+ long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_nanoMsgAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
+
+celix_status_t pubsub_nanoMsgAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint);
+celix_status_t pubsub_nanoMsgAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint);
+
+void pubsub_nanoMsgAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
+void pubsub_nanoMsgAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
+#ifdef __cplusplus
+}
+#endif
+
+celix_status_t pubsub_nanoMsgAdmin_executeCommand(void *handle, char *commandLine, FILE *outStream, FILE *errStream);
+
+#endif //CELIX_PUBSUB_ZMQ_ADMIN_H
+
http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
new file mode 100644
index 0000000..2a2bcfe
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
@@ -0,0 +1,56 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <memory.h>
+#include "pubsub_nanomsg_common.h"
+
+int psa_nanoMsg_localMsgTypeIdForMsgType(void *handle __attribute__((unused)), const char *msgType,
+ unsigned int *msgTypeId) {
+ *msgTypeId = utils_stringHash(msgType);
+ return 0;
+}
+
+bool psa_nanomsg_checkVersion(version_pt msgVersion, const pubsub_nanmosg_msg_header_t *hdr) {
+ bool check=false;
+ int major=0,minor=0;
+
+ if (msgVersion!=NULL) {
+ version_getMajor(msgVersion,&major);
+ version_getMinor(msgVersion,&minor);
+ if(hdr->major==((unsigned char)major)){ /* Different major means incompatible */
+ check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
+ }
+ }
+
+ return check;
+}
+
+void psa_nanomsg_setScopeAndTopicFilter(const char *scope, const char *topic, char *filter) {
+ for (int i = 0; i < 5; ++i) {
+ filter[i] = '\0';
+ }
+ if (scope != NULL && strnlen(scope, 3) >= 2) {
+ filter[0] = scope[0];
+ filter[1] = scope[1];
+ }
+ if (topic != NULL && strnlen(topic, 3) >= 2) {
+ filter[2] = topic[0];
+ filter[3] = topic[1];
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
new file mode 100644
index 0000000..28293a8
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
@@ -0,0 +1,56 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef CELIX_PUBSUB_ZMQ_COMMON_H
+#define CELIX_PUBSUB_ZMQ_COMMON_H
+
+#include <utils.h>
+
+#include "version.h"
+#include "pubsub_common.h"
+
+
+/*
+ * NOTE zmq is used by first sending three frames:
+ * 1) A subscription filter.
+ * This is a 5 char string of the first two chars of scope and topic combined and terminated with a '\0'.
+ *
+ * 2) The pubsub_zmq_msg_header_t is send containg the type id and major/minor version
+ *
+ * 3) The actual payload
+ */
+
+
+struct pubsub_zmq_msg_header {
+ //header
+ unsigned int type;
+ unsigned char major;
+ unsigned char minor;
+};
+
+typedef struct pubsub_zmq_msg_header pubsub_nanmosg_msg_header_t;
+
+
+int psa_nanoMsg_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId);
+void psa_nanomsg_setScopeAndTopicFilter(const char *scope, const char *topic, char *filter);
+
+bool psa_nanomsg_checkVersion(version_pt msgVersion, const pubsub_nanmosg_msg_header_t *hdr);
+
+
+#endif //CELIX_PUBSUB_ZMQ_COMMON_H
http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
new file mode 100644
index 0000000..646a80e
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -0,0 +1,420 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <mutex>
+#include <memory.h>
+#include <vector>
+#include <string>
+#include <sstream>
+
+#include <stdlib.h>
+#include <assert.h>
+
+#include <sys/epoll.h>
+#include <arpa/inet.h>
+
+#include <nanomsg/nn.h>
+#include <nanomsg/bus.h>
+
+#include <pubsub_serializer.h>
+#include <pubsub/subscriber.h>
+#include <pubsub_constants.h>
+#include <pubsub_endpoint.h>
+#include <log_helper.h>
+
+#include "pubsub_nanomsg_topic_receiver.h"
+#include "pubsub_psa_nanomsg_constants.h"
+#include "pubsub_nanomsg_common.h"
+#include "pubsub_topology_manager.h"
+
+//TODO see if block and wakeup (reset) also works
+#define PSA_NANOMSG_RECV_TIMEOUT 1000
+
+/*
+#define L_DEBUG(...) \
+ logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+ logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+ logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+ logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+*/
+#define L_DEBUG printf
+#define L_INFO printf
+#define L_WARN printf
+#define L_ERROR printf
+
+struct pubsub_nanomsg_topic_receiver {
+ celix_bundle_context_t *ctx;
+ log_helper_t *logHelper;
+ long serializerSvcId;
+ pubsub_serializer_service_t *serializer;
+ char *scope;
+ char *topic;
+ char scopeAndTopicFilter[5];
+
+ int nanoMsgSocket;
+
+ struct {
+ celix_thread_t thread;
+ std::mutex mutex;
+ bool running;
+ } recvThread;
+
+ struct {
+ std::mutex mutex;
+ hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t*
+ } requestedConnections;
+
+ long subscriberTrackerId;
+ struct {
+ std::mutex mutex;
+ hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
+ } subscribers;
+};
+
+typedef struct psa_zmq_requested_connection_entry {
+ char *url;
+ bool connected;
+ int id;
+} psa_nanomsg_requested_connection_entry_t;
+
+typedef struct psa_zmq_subscriber_entry {
+ int usageCount;
+ hash_map_t *msgTypes; //map from serializer svc
+ pubsub_subscriber_t *svc;
+} psa_nanomsg_subscriber_entry_t;
+
+
+static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
+static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props,
+ const celix_bundle_t *owner);
+static void* psa_nanomsg_recvThread(void *data);
+
+
+pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
+ log_helper_t *logHelper, const char *scope,
+ const char *topic, long serializerSvcId,
+ pubsub_serializer_service_t *serializer) {
+ pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(calloc(1, sizeof(*receiver)));
+ receiver->ctx = ctx;
+ receiver->logHelper = logHelper;
+ receiver->serializerSvcId = serializerSvcId;
+ receiver->serializer = serializer;
+ psa_nanomsg_setScopeAndTopicFilter(scope, topic, receiver->scopeAndTopicFilter);
+
+
+ receiver->nanoMsgSocket = nn_socket(AF_SP, NN_BUS);
+ if (receiver->nanoMsgSocket < 0) {
+ free(receiver);
+ receiver = NULL;
+ L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s", scope, topic);
+ } else {
+ int timeout = PSA_NANOMSG_RECV_TIMEOUT;
+ if (nn_setsockopt(receiver->nanoMsgSocket , NN_SOL_SOCKET, NN_RCVTIMEO, &timeout,
+ sizeof (timeout)) < 0) {
+ free(receiver);
+ receiver = NULL;
+ L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s, set sockopt RECV_TIMEO failed", scope, topic);
+ }
+
+ char subscribeFilter[5];
+ psa_nanomsg_setScopeAndTopicFilter(scope, topic, subscribeFilter);
+ //zsock_set_subscribe(receiver->nanoMsgSocket, subscribeFilter);
+
+ receiver->scope = strndup(scope, 1024 * 1024);
+ receiver->topic = strndup(topic, 1024 * 1024);
+
+ receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+ receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+ int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
+ char buf[size + 1];
+ snprintf(buf, (size_t) size + 1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
+ celix_service_tracking_options_t opts{};
+ opts.filter.ignoreServiceLanguage = true;
+ opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
+ opts.filter.filter = buf;
+ opts.callbackHandle = receiver;
+ opts.addWithOwner = pubsub_zmqTopicReceiver_addSubscriber;
+ opts.removeWithOwner = pubsub_nanoMsgTopicReceiver_removeSubscriber;
+
+ receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+ receiver->recvThread.running = true;
+ celixThread_create(&receiver->recvThread.thread, NULL, psa_nanomsg_recvThread, receiver);
+ std::stringstream namestream;
+ namestream << "NANOMSG TR " << scope << "/" << topic;
+ celixThread_setName(&receiver->recvThread.thread, namestream.str().c_str());
+ }
+ return receiver;
+}
+
+void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiver) {
+ if (receiver != NULL) {
+
+ {
+ std::lock_guard<std::mutex> _lock(receiver->recvThread.mutex);
+ receiver->recvThread.running = false;
+ }
+ celixThread_join(receiver->recvThread.thread, NULL);
+
+ celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId);
+
+ hash_map_iterator_t iter=hash_map_iterator_t();
+ {
+ std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
+ iter = hashMapIterator_construct(receiver->subscribers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter));
+ if (entry != NULL) {
+ receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+ free(entry);
+ }
+ }
+ hashMap_destroy(receiver->subscribers.map, false, false);
+ }
+
+
+ {
+ std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex);
+ iter = hashMapIterator_construct(receiver->requestedConnections.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMapIterator_nextValue(&iter));
+ if (entry != NULL) {
+ free(entry->url);
+ free(entry);
+ }
+ }
+ hashMap_destroy(receiver->requestedConnections.map, false, false);
+ }
+
+ //celixThreadMutex_destroy(&receiver->subscribers.mutex);
+ //celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
+ //celixThreadMutex_destroy(&receiver->recvThread.mutex);
+
+ nn_close(receiver->nanoMsgSocket);
+
+ free(receiver->scope);
+ free(receiver->topic);
+ }
+ free(receiver);
+}
+
+const char* pubsub_nanoMsgTopicReceiver_scope(pubsub_nanomsg_topic_receiver_t *receiver) {
+ return receiver->scope;
+}
+const char* pubsub_nanoMsgTopicReceiver_topic(pubsub_nanomsg_topic_receiver_t *receiver) {
+ return receiver->topic;
+}
+
+long pubsub_nanoMsgTopicReceiver_serializerSvcId(pubsub_nanomsg_topic_receiver_t *receiver) {
+ return receiver->serializerSvcId;
+}
+
+void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t *receiver,
+ std::vector<std::string> &connectedUrls,
+ std::vector<std::string> &unconnectedUrls) {
+ std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t *>(hashMapIterator_nextValue(&iter));
+ if (entry->connected) {
+ connectedUrls.push_back(std::string(entry->url));
+ } else {
+ unconnectedUrls.push_back(std::string(entry->url));
+ }
+ }
+}
+
+
+void pubsub_nanoMsgTopicReceiver_connectTo(
+ pubsub_nanomsg_topic_receiver_t *receiver,
+ const char *url) {
+ L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", receiver->scope, receiver->topic, url);
+
+ std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex);
+ psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_get(receiver->requestedConnections.map, url));
+ if (entry == NULL) {
+ entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(calloc(1, sizeof(*entry)));
+ entry->url = strndup(url, 1024*1024);
+ entry->connected = false;
+ hashMap_put(receiver->requestedConnections.map, (void*)entry->url, entry);
+ }
+ if (!entry->connected) {
+ int connection_id = nn_connect(receiver->nanoMsgSocket, url);
+ if (connection_id >= 0) {
+ entry->connected = true;
+ entry->id = connection_id;
+ } else {
+ L_WARN("[PSA_NANOMSG] Error connecting to NANOMSG url %s. (%s)", url, strerror(errno));
+ }
+ }
+}
+
+void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t *receiver, const char *url) {
+ L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", receiver->scope, receiver->topic, url);
+
+ std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex);
+ psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_remove(receiver->requestedConnections.map, url));
+ if (entry != NULL && entry->connected) {
+ if (nn_shutdown(receiver->nanoMsgSocket, entry->id) == 0) {
+ entry->connected = false;
+ } else {
+ L_WARN("[PSA_NANOMSG] Error disconnecting from nanomsg url %s, id %d. (%s)", url, entry->id, strerror(errno));
+ }
+ }
+ if (entry != NULL) {
+ free(entry->url);
+ free(entry);
+ }
+}
+
+static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
+ pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(handle);
+
+ long bndId = celix_bundle_getId(bnd);
+ const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
+ if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) {
+ //not the same scope. ignore
+ return;
+ }
+
+ std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
+ psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(receiver->subscribers.map, (void*)bndId));
+ if (entry != NULL) {
+ entry->usageCount += 1;
+ } else {
+ //new create entry
+ entry = static_cast<psa_nanomsg_subscriber_entry_t*>(calloc(1, sizeof(*entry)));
+ entry->usageCount = 1;
+ entry->svc = static_cast<pubsub_subscriber_t*>(svc);
+
+ int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
+ if (rc == 0) {
+ hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
+ } else {
+ L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+ free(entry);
+ }
+ }
+}
+
+static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void */*svc*/,
+ const celix_properties_t */*props*/, const celix_bundle_t *bnd) {
+ pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(handle);
+
+ long bndId = celix_bundle_getId(bnd);
+
+ std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
+ psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(receiver->subscribers.map, (void*)bndId));
+ if (entry != NULL) {
+ entry->usageCount -= 1;
+ }
+ if (entry != NULL && entry->usageCount <= 0) {
+ //remove entry
+ hashMap_remove(receiver->subscribers.map, (void*)bndId);
+ int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
+ if (rc != 0) {
+ L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+ }
+ free(entry);
+ }
+}
+
+static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver_t *receiver, psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) {
+ pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type)));
+ pubsub_subscriber_t *svc = entry->svc;
+
+ if (msgSer!= NULL) {
+ void *deserializedMsg = NULL;
+ bool validVersion = psa_nanomsg_checkVersion(msgSer->msgVersion, hdr);
+ if (validVersion) {
+ celix_status_t status = msgSer->deserialize(msgSer, payload, payloadSize, &deserializedMsg);
+ if(status == CELIX_SUCCESS) {
+ bool release = false;
+ svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, NULL, &release);
+ if (release) {
+ msgSer->freeMsg(msgSer->handle, deserializedMsg);
+ }
+ } else {
+ L_WARN("[PSA_NANOMSG_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope, receiver->topic);
+ }
+ }
+ } else {
+ L_WARN("[PSA_NANOMSG_TR] Cannot find serializer for type id %i", hdr->type);
+ }
+}
+
+static inline void processMsg(pubsub_nanomsg_topic_receiver_t *receiver, const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) {
+ std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter));
+ if (entry != NULL) {
+ processMsgForSubscriberEntry(receiver, entry, hdr, payload, payloadSize);
+ }
+ }
+}
+
+static void* psa_nanomsg_recvThread(void *data) {
+ pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(data);
+ bool running{};
+ {
+ std::lock_guard<std::mutex> _lock(receiver->recvThread.mutex);
+ running = receiver->recvThread.running;
+ }
+ while (running) {
+ void * payload = nullptr;
+ nn_iovec iov[2];
+ iov[0].iov_base = &payload;
+ iov[0].iov_len = NN_MSG;
+
+ nn_msghdr msgHdr;
+ memset(&msgHdr, 0, sizeof(msgHdr));
+
+ msgHdr.msg_iov = iov;
+ msgHdr.msg_iovlen = 1;//2;
+
+ msgHdr.msg_control = nullptr;
+ msgHdr.msg_controllen = 0;
+
+ errno = 0;
+ int recvBytes = nn_recvmsg(receiver->nanoMsgSocket, &msgHdr, 0);
+ if (payload && static_cast<unsigned long>(recvBytes) >= sizeof(pubsub_nanmosg_msg_header_t)) {
+ pubsub_nanmosg_msg_header_t *header = static_cast<pubsub_nanmosg_msg_header_t*>(payload);
+ void* msg = ((char*)payload) + sizeof(*header);
+ printf("HEADER: Type %d, major %d, minor %d\n", header->type , (int)header->major, (int)header->minor);
+ fprintf(stderr, "RECEIVED %d bytes\n", recvBytes);
+ processMsg(receiver, header, (char*)msg, recvBytes-sizeof(header));
+
+ nn_freemsg(payload);
+ } else if (recvBytes >= 0) {
+ L_ERROR("[PSA_ZMQ_TR] Error receiving nanmosg msg, size (%d) smaller than header\n", recvBytes);
+ } else if (errno == EAGAIN || errno == ETIMEDOUT) {
+ //nop
+ } else if (errno == EINTR) {
+ L_DEBUG("[PSA_ZMQ_TR] zmsg_recv interrupted");
+ } else {
+ L_WARN("[PSA_ZMQ_TR] Error receiving zmq message: errno %d: %s\n", errno, strerror(errno));
+ }
+ } // while
+
+ return NULL;
+}
http://git-wip-us.apache.org/repos/asf/celix/blob/4fc1f3d3/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
new file mode 100644
index 0000000..786fb90
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
@@ -0,0 +1,45 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+#ifndef CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H
+#define CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H
+#include <string>
+#include <vector>
+
+#include "celix_bundle_context.h"
+
+typedef struct pubsub_nanomsg_topic_receiver pubsub_nanomsg_topic_receiver_t;
+
+pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx,
+ log_helper_t *logHelper, const char *scope,
+ const char *topic, long serializerSvcId,
+ pubsub_serializer_service_t *serializer);
+void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiver);
+
+const char* pubsub_nanoMsgTopicReceiver_scope(pubsub_nanomsg_topic_receiver_t *receiver);
+const char* pubsub_nanoMsgTopicReceiver_topic(pubsub_nanomsg_topic_receiver_t *receiver);
+
+long pubsub_nanoMsgTopicReceiver_serializerSvcId(pubsub_nanomsg_topic_receiver_t *receiver);
+void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t *receiver,
+ std::vector<std::string> &connectedUrls,
+ std::vector<std::string> &unconnectedUrls);
+
+void pubsub_nanoMsgTopicReceiver_connectTo(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
+void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t *receiver, const char *url);
+
+#endif //CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H