You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/01/30 19:30:23 UTC
[39/54] [abbrv] celix git commit: CELIX-417: Refactors cmake usage of
pubsub and rsa. Started with installing exported targets
http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_admin_zmq/private/src/zmq_crypto.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/zmq_crypto.c b/pubsub/pubsub_admin_zmq/private/src/zmq_crypto.c
deleted file mode 100644
index fe444bd..0000000
--- a/pubsub/pubsub_admin_zmq/private/src/zmq_crypto.c
+++ /dev/null
@@ -1,281 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * zmq_crypto.c
- *
- * \date Dec 2, 2016
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#include "zmq_crypto.h"
-
-#include <zmq.h>
-#include <openssl/conf.h>
-#include <openssl/evp.h>
-#include <openssl/err.h>
-
-#include <string.h>
-
-#define MAX_FILE_PATH_LENGTH 512
-#define ZMQ_KEY_LENGTH 40
-#define AES_KEY_LENGTH 32
-#define AES_IV_LENGTH 16
-
-#define KEY_TO_GET "aes_key"
-#define IV_TO_GET "aes_iv"
-
-static char* read_file_content(char* filePath, char* fileName);
-static void parse_key_lines(char *keysBuffer, char **key, char **iv);
-static void parse_key_line(char *line, char **key, char **iv);
-static void extract_keys_from_buffer(unsigned char *input, int inputlen, char **publicKey, char **secretKey);
-
-/**
- * Return a valid zcert_t from an encoded file
- * Caller is responsible for freeing by calling zcert_destroy(zcert** cert);
- */
-zcert_t* get_zcert_from_encoded_file(char* keysFilePath, char* keysFileName, char* file_path)
-{
-
- if (keysFilePath == NULL){
- keysFilePath = DEFAULT_KEYS_FILE_PATH;
- }
-
- if (keysFileName == NULL){
- keysFileName = DEFAULT_KEYS_FILE_NAME;
- }
-
- char* keys_data = read_file_content(keysFilePath, keysFileName);
- if (keys_data == NULL){
- return NULL;
- }
-
- char *key = NULL;
- char *iv = NULL;
- parse_key_lines(keys_data, &key, &iv);
- free(keys_data);
-
- if (key == NULL || iv == NULL){
- free(key);
- free(iv);
-
- printf("CRYPTO: Loading AES key and/or AES iv failed!\n");
- return NULL;
- }
-
- //At this point, we know an aes key and iv are stored and loaded
-
- // generate sha256 hashes
- unsigned char key_digest[EVP_MAX_MD_SIZE];
- unsigned char iv_digest[EVP_MAX_MD_SIZE];
- generate_sha256_hash((char*) key, key_digest);
- generate_sha256_hash((char*) iv, iv_digest);
-
- zchunk_t* encoded_secret = zchunk_slurp (file_path, 0);
- if (encoded_secret == NULL){
- free(key);
- free(iv);
-
- return NULL;
- }
-
- int encoded_secret_size = (int) zchunk_size (encoded_secret);
- char* encoded_secret_data = zchunk_strdup(encoded_secret);
- zchunk_destroy (&encoded_secret);
-
- // Decryption of data
- int decryptedtext_len;
- unsigned char decryptedtext[encoded_secret_size];
- decryptedtext_len = decrypt((unsigned char *) encoded_secret_data, encoded_secret_size, key_digest, iv_digest, decryptedtext);
- decryptedtext[decryptedtext_len] = '\0';
-
- EVP_cleanup();
-
- free(encoded_secret_data);
- free(key);
- free(iv);
-
- // The public and private keys are retrieved
- char* public_text = NULL;
- char* secret_text = NULL;
-
- extract_keys_from_buffer(decryptedtext, decryptedtext_len, &public_text, &secret_text);
-
- byte public_key [32] = { 0 };
- byte secret_key [32] = { 0 };
-
- zmq_z85_decode (public_key, public_text);
- zmq_z85_decode (secret_key, secret_text);
-
- zcert_t* cert_loaded = zcert_new_from(public_key, secret_key);
-
- free(public_text);
- free(secret_text);
-
- return cert_loaded;
-}
-
-int generate_sha256_hash(char* text, unsigned char* digest)
-{
- unsigned int digest_len;
-
- EVP_MD_CTX * mdctx = EVP_MD_CTX_new();
- EVP_DigestInit_ex(mdctx, EVP_sha256(), NULL);
- EVP_DigestUpdate(mdctx, text, strlen(text));
- EVP_DigestFinal_ex(mdctx, digest, &digest_len);
- EVP_MD_CTX_free(mdctx);
-
- return digest_len;
-}
-
-int decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext)
-{
- int len;
- int plaintext_len;
-
- EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new();
-
- EVP_DecryptInit_ex(ctx, EVP_aes_256_cbc(), NULL, key, iv);
- EVP_DecryptUpdate(ctx, plaintext, &len, ciphertext, ciphertext_len);
- plaintext_len = len;
- EVP_DecryptFinal_ex(ctx, plaintext + len, &len);
- plaintext_len += len;
-
- EVP_CIPHER_CTX_free(ctx);
-
- return plaintext_len;
-}
-
-/**
- * Caller is responsible for freeing the returned value
- */
-static char* read_file_content(char* filePath, char* fileName){
-
- char fileNameWithPath[MAX_FILE_PATH_LENGTH];
- snprintf(fileNameWithPath, MAX_FILE_PATH_LENGTH, "%s/%s", filePath, fileName);
- int rc = 0;
-
- if (!zsys_file_exists(fileNameWithPath)){
- printf("CRYPTO: Keys file '%s' doesn't exist!\n", fileNameWithPath);
- return NULL;
- }
-
- zfile_t* keys_file = zfile_new (filePath, fileName);
- rc = zfile_input (keys_file);
- if (rc != 0){
- zfile_destroy(&keys_file);
- printf("CRYPTO: Keys file '%s' not readable!\n", fileNameWithPath);
- return NULL;
- }
-
- ssize_t keys_file_size = zsys_file_size (fileNameWithPath);
- zchunk_t* keys_chunk = zfile_read (keys_file, keys_file_size, 0);
- if (keys_chunk == NULL){
- zfile_close(keys_file);
- zfile_destroy(&keys_file);
- printf("CRYPTO: Can't read file '%s'!\n", fileNameWithPath);
- return NULL;
- }
-
- char* keys_data = zchunk_strdup(keys_chunk);
- zchunk_destroy(&keys_chunk);
- zfile_close(keys_file);
- zfile_destroy (&keys_file);
-
- return keys_data;
-}
-
-static void parse_key_lines(char *keysBuffer, char **key, char **iv){
- char *line = NULL, *saveLinePointer = NULL;
-
- bool firstTime = true;
- do {
- if (firstTime){
- line = strtok_r(keysBuffer, "\n", &saveLinePointer);
- firstTime = false;
- }else {
- line = strtok_r(NULL, "\n", &saveLinePointer);
- }
-
- if (line == NULL){
- break;
- }
-
- parse_key_line(line, key, iv);
-
- } while((*key == NULL || *iv == NULL) && line != NULL);
-
-}
-
-static void parse_key_line(char *line, char **key, char **iv){
- char *detectedKey = NULL, *detectedValue= NULL;
-
- char* sep_at = strchr(line, ':');
- if (sep_at == NULL){
- return;
- }
-
- *sep_at = '\0'; // overwrite first separator, creating two strings.
- detectedKey = line;
- detectedValue = sep_at + 1;
-
- if (detectedKey == NULL || detectedValue == NULL){
- return;
- }
- if (detectedKey[0] == '\0' || detectedValue[0] == '\0'){
- return;
- }
-
- if (*key == NULL && strcmp(detectedKey, KEY_TO_GET) == 0){
- *key = strndup(detectedValue, AES_KEY_LENGTH);
- } else if (*iv == NULL && strcmp(detectedKey, IV_TO_GET) == 0){
- *iv = strndup(detectedValue, AES_IV_LENGTH);
- }
-}
-
-static void extract_keys_from_buffer(unsigned char *input, int inputlen, char **publicKey, char **secretKey) {
- // Load decrypted text buffer
- zchunk_t* secret_decrypted = zchunk_new(input, inputlen);
- if (secret_decrypted == NULL){
- printf("CRYPTO: Failed to create zchunk\n");
- return;
- }
-
- zconfig_t* secret_config = zconfig_chunk_load (secret_decrypted);
- zchunk_destroy (&secret_decrypted);
- if (secret_config == NULL){
- printf("CRYPTO: Failed to create zconfig\n");
- return;
- }
-
- // Extract public and secret key from text buffer
- char* public_text = zconfig_get (secret_config, "/curve/public-key", NULL);
- char* secret_text = zconfig_get (secret_config, "/curve/secret-key", NULL);
-
- if (public_text == NULL || secret_text == NULL){
- zconfig_destroy(&secret_config);
- printf("CRYPTO: Loading public / secret key from text-buffer failed!\n");
- return;
- }
-
- *publicKey = strndup(public_text, ZMQ_KEY_LENGTH + 1);
- *secretKey = strndup(secret_text, ZMQ_KEY_LENGTH + 1);
-
- zconfig_destroy(&secret_config);
-}
http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_admin_zmq/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/src/psa_activator.c b/pubsub/pubsub_admin_zmq/src/psa_activator.c
new file mode 100644
index 0000000..fd07310
--- /dev/null
+++ b/pubsub/pubsub_admin_zmq/src/psa_activator.c
@@ -0,0 +1,142 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * psa_activator.c
+ *
+ * \date Sep 30, 2011
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#include <stdlib.h>
+
+#include "bundle_activator.h"
+#include "service_registration.h"
+#include "service_tracker.h"
+
+#include "pubsub_admin_impl.h"
+
+
+struct activator {
+ pubsub_admin_pt admin;
+ pubsub_admin_service_pt adminService;
+ service_registration_pt registration;
+ service_tracker_pt serializerTracker;
+};
+
+celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
+ celix_status_t status = CELIX_SUCCESS;
+ struct activator *activator;
+
+ activator = calloc(1, sizeof(*activator));
+ if (!activator) {
+ status = CELIX_ENOMEM;
+ }
+ else{
+ *userData = activator;
+
+ status = pubsubAdmin_create(context, &(activator->admin));
+
+ if(status == CELIX_SUCCESS){
+ service_tracker_customizer_pt customizer = NULL;
+ status = serviceTrackerCustomizer_create(activator->admin,
+ NULL,
+ pubsubAdmin_serializerAdded,
+ NULL,
+ pubsubAdmin_serializerRemoved,
+ &customizer);
+ if(status == CELIX_SUCCESS){
+ status = serviceTracker_create(context, PUBSUB_SERIALIZER_SERVICE, customizer, &(activator->serializerTracker));
+ if(status != CELIX_SUCCESS){
+ serviceTrackerCustomizer_destroy(customizer);
+ pubsubAdmin_destroy(activator->admin);
+ }
+ }
+ else{
+ pubsubAdmin_destroy(activator->admin);
+ }
+ }
+ }
+
+ return status;
+}
+
+celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
+ celix_status_t status = CELIX_SUCCESS;
+ struct activator *activator = userData;
+ pubsub_admin_service_pt pubsubAdminSvc = calloc(1, sizeof(*pubsubAdminSvc));
+
+ if (!pubsubAdminSvc) {
+ status = CELIX_ENOMEM;
+ }
+ else{
+ pubsubAdminSvc->admin = activator->admin;
+
+ pubsubAdminSvc->addPublication = pubsubAdmin_addPublication;
+ pubsubAdminSvc->removePublication = pubsubAdmin_removePublication;
+
+ pubsubAdminSvc->addSubscription = pubsubAdmin_addSubscription;
+ pubsubAdminSvc->removeSubscription = pubsubAdmin_removeSubscription;
+
+ pubsubAdminSvc->closeAllPublications = pubsubAdmin_closeAllPublications;
+ pubsubAdminSvc->closeAllSubscriptions = pubsubAdmin_closeAllSubscriptions;
+
+ pubsubAdminSvc->matchEndpoint = pubsubAdmin_matchEndpoint;
+
+ activator->adminService = pubsubAdminSvc;
+
+ status = bundleContext_registerService(context, PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL, &activator->registration);
+
+ status += serviceTracker_open(activator->serializerTracker);
+
+ }
+
+
+ return status;
+}
+
+celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
+ celix_status_t status = CELIX_SUCCESS;
+ struct activator *activator = userData;
+
+ status += serviceTracker_close(activator->serializerTracker);
+ status += serviceRegistration_unregister(activator->registration);
+
+ activator->registration = NULL;
+
+ free(activator->adminService);
+ activator->adminService = NULL;
+
+ return status;
+}
+
+celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) {
+ celix_status_t status = CELIX_SUCCESS;
+ struct activator *activator = userData;
+
+ serviceTracker_destroy(activator->serializerTracker);
+ pubsubAdmin_destroy(activator->admin);
+ activator->admin = NULL;
+
+ free(activator);
+
+ return status;
+}
+
+
http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
new file mode 100644
index 0000000..29ead0c
--- /dev/null
+++ b/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
@@ -0,0 +1,1040 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_admin_impl.c
+ *
+ * \date Sep 30, 2011
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#include "pubsub_admin_impl.h"
+#include <zmq.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+
+#include <arpa/inet.h>
+#include <sys/socket.h>
+#include <netdb.h>
+
+#ifndef ANDROID
+#include <ifaddrs.h>
+#endif
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+
+#include "constants.h"
+#include "utils.h"
+#include "hash_map.h"
+#include "array_list.h"
+#include "bundle_context.h"
+#include "bundle.h"
+#include "service_reference.h"
+#include "service_registration.h"
+#include "log_helper.h"
+#include "log_service.h"
+#include "celix_threads.h"
+#include "service_factory.h"
+
+#include "topic_subscription.h"
+#include "topic_publication.h"
+#include "pubsub_endpoint.h"
+#include "pubsub_utils.h"
+#include "subscriber.h"
+
+#define MAX_KEY_FOLDER_PATH_LENGTH 512
+
+static const char *DEFAULT_IP = "127.0.0.1";
+
+static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip);
+static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc);
+static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication);
+static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication);
+
+celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin) {
+ celix_status_t status = CELIX_SUCCESS;
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+ if (!zsys_has_curve()){
+ printf("PSA_ZMQ: zeromq curve unsupported\n");
+ return CELIX_SERVICE_EXCEPTION;
+ }
+#endif
+
+ *admin = calloc(1, sizeof(**admin));
+
+ if (!*admin) {
+ status = CELIX_ENOMEM;
+ }
+ else{
+
+ const char *ip = NULL;
+ char *detectedIp = NULL;
+ (*admin)->bundle_context= context;
+ (*admin)->localPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ (*admin)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ (*admin)->pendingSubscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ (*admin)->externalPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ (*admin)->topicSubscriptionsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL);
+ (*admin)->topicPublicationsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL);
+ arrayList_create(&((*admin)->noSerializerSubscriptions));
+ arrayList_create(&((*admin)->noSerializerPublications));
+ arrayList_create(&((*admin)->serializerList));
+
+ celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL);
+ celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL);
+ celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL);
+ celixThreadMutex_create(&(*admin)->serializerListLock, NULL);
+ celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL);
+
+ celixThreadMutexAttr_create(&(*admin)->noSerializerPendingsAttr);
+ celixThreadMutexAttr_settype(&(*admin)->noSerializerPendingsAttr, CELIX_THREAD_MUTEX_RECURSIVE);
+ celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, &(*admin)->noSerializerPendingsAttr);
+
+ celixThreadMutexAttr_create(&(*admin)->pendingSubscriptionsAttr);
+ celixThreadMutexAttr_settype(&(*admin)->pendingSubscriptionsAttr, CELIX_THREAD_MUTEX_RECURSIVE);
+ celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, &(*admin)->pendingSubscriptionsAttr);
+
+ if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) {
+ logHelper_start((*admin)->loghelper);
+ }
+
+ bundleContext_getProperty(context,PSA_IP , &ip);
+
+#ifndef ANDROID
+ if (ip == NULL) {
+ const char *interface = NULL;
+
+ bundleContext_getProperty(context, PSA_ITF, &interface);
+ if (pubsubAdmin_getIpAdress(interface, &detectedIp) != CELIX_SUCCESS) {
+ logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_ZMQ: Could not retrieve IP adress for interface %s", interface);
+ }
+
+ ip = detectedIp;
+ }
+#endif
+
+ if (ip != NULL) {
+ logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_ZMQ: Using %s for service annunciation", ip);
+ (*admin)->ipAddress = strdup(ip);
+ }
+ else {
+ logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_ZMQ: No IP address for service annunciation set. Using %s", DEFAULT_IP);
+ (*admin)->ipAddress = strdup(DEFAULT_IP);
+ }
+
+ if (detectedIp != NULL) {
+ free(detectedIp);
+ }
+
+ const char* basePortStr = NULL;
+ const char* maxPortStr = NULL;
+ char* endptrBase = NULL;
+ char* endptrMax = NULL;
+ bundleContext_getPropertyWithDefault(context, PSA_ZMQ_BASE_PORT, "PSA_ZMQ_DEFAULT_BASE_PORT", &basePortStr);
+ bundleContext_getPropertyWithDefault(context, PSA_ZMQ_MAX_PORT, "PSA_ZMQ_DEFAULT_MAX_PORT", &maxPortStr);
+ (*admin)->basePort = strtol(basePortStr, &endptrBase, 10);
+ (*admin)->maxPort = strtol(maxPortStr, &endptrMax, 10);
+ if (*endptrBase != '\0') {
+ (*admin)->basePort = PSA_ZMQ_DEFAULT_BASE_PORT;
+ }
+ if (*endptrMax != '\0') {
+ (*admin)->maxPort = PSA_ZMQ_DEFAULT_MAX_PORT;
+ }
+
+ printf("PSA Using base port %u to max port %u\n", (*admin)->basePort, (*admin)->maxPort);
+
+ // Disable Signal Handling by CZMQ
+ setenv("ZSYS_SIGHANDLER", "false", true);
+
+ const char *nrZmqThreads = NULL;
+ bundleContext_getProperty(context, "PSA_NR_ZMQ_THREADS", &nrZmqThreads);
+
+ if(nrZmqThreads != NULL) {
+ char *endPtr = NULL;
+ unsigned int nrThreads = strtoul(nrZmqThreads, &endPtr, 10);
+ if(endPtr != nrZmqThreads && nrThreads > 0 && nrThreads < 50) {
+ zsys_set_io_threads(nrThreads);
+ logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_ZMQ: Using %d threads for ZMQ", nrThreads);
+ printf("PSA_ZMQ: Using %d threads for ZMQ\n", nrThreads);
+ }
+ }
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+ // Setup authenticator
+ zactor_t* auth = zactor_new (zauth, NULL);
+ zstr_sendx(auth, "VERBOSE", NULL);
+
+ // Load all public keys of subscribers into the application
+ // This step is done for authenticating subscribers
+ char curve_folder_path[MAX_KEY_FOLDER_PATH_LENGTH];
+ char* keys_bundle_dir = pubsub_getKeysBundleDir(context);
+ snprintf(curve_folder_path, MAX_KEY_FOLDER_PATH_LENGTH, "%s/META-INF/keys/subscriber/public", keys_bundle_dir);
+ zstr_sendx (auth, "CURVE", curve_folder_path, NULL);
+ free(keys_bundle_dir);
+
+ (*admin)->zmq_auth = auth;
+#endif
+
+ }
+
+ return status;
+}
+
+
+celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin)
+{
+ celix_status_t status = CELIX_SUCCESS;
+
+ free(admin->ipAddress);
+
+ celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
+ hash_map_iterator_pt iter = hashMapIterator_create(admin->pendingSubscriptions);
+ while(hashMapIterator_hasNext(iter)){
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+ free((char*)hashMapEntry_getKey(entry));
+ arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
+ }
+ hashMapIterator_destroy(iter);
+ hashMap_destroy(admin->pendingSubscriptions,false,false);
+ celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
+
+ celixThreadMutex_lock(&admin->subscriptionsLock);
+ hashMap_destroy(admin->subscriptions,false,false);
+ celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+ celixThreadMutex_lock(&admin->localPublicationsLock);
+ hashMap_destroy(admin->localPublications,true,false);
+ celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+ celixThreadMutex_lock(&admin->externalPublicationsLock);
+ iter = hashMapIterator_create(admin->externalPublications);
+ while(hashMapIterator_hasNext(iter)){
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+ free((char*)hashMapEntry_getKey(entry));
+ arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
+ }
+ hashMapIterator_destroy(iter);
+ hashMap_destroy(admin->externalPublications,false,false);
+ celixThreadMutex_unlock(&admin->externalPublicationsLock);
+
+ celixThreadMutex_lock(&admin->serializerListLock);
+ arrayList_destroy(admin->serializerList);
+ celixThreadMutex_unlock(&admin->serializerListLock);
+
+ celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+ arrayList_destroy(admin->noSerializerSubscriptions);
+ arrayList_destroy(admin->noSerializerPublications);
+ celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+
+
+ celixThreadMutex_lock(&admin->usedSerializersLock);
+
+ iter = hashMapIterator_create(admin->topicSubscriptionsPerSerializer);
+ while(hashMapIterator_hasNext(iter)){
+ arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
+ }
+ hashMapIterator_destroy(iter);
+ hashMap_destroy(admin->topicSubscriptionsPerSerializer,false,false);
+
+ iter = hashMapIterator_create(admin->topicPublicationsPerSerializer);
+ while(hashMapIterator_hasNext(iter)){
+ arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
+ }
+ hashMapIterator_destroy(iter);
+ hashMap_destroy(admin->topicPublicationsPerSerializer,false,false);
+
+ celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+ celixThreadMutex_destroy(&admin->usedSerializersLock);
+ celixThreadMutex_destroy(&admin->serializerListLock);
+ celixThreadMutex_destroy(&admin->pendingSubscriptionsLock);
+
+ celixThreadMutexAttr_destroy(&admin->noSerializerPendingsAttr);
+ celixThreadMutex_destroy(&admin->noSerializerPendingsLock);
+
+ celixThreadMutexAttr_destroy(&admin->pendingSubscriptionsAttr);
+ celixThreadMutex_destroy(&admin->subscriptionsLock);
+
+ celixThreadMutex_destroy(&admin->localPublicationsLock);
+ celixThreadMutex_destroy(&admin->externalPublicationsLock);
+
+ logHelper_stop(admin->loghelper);
+
+ logHelper_destroy(&admin->loghelper);
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+ if (admin->zmq_auth != NULL){
+ zactor_destroy(&(admin->zmq_auth));
+ }
+#endif
+
+ free(admin);
+
+ return status;
+}
+
+static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
+ celix_status_t status = CELIX_SUCCESS;
+
+ celixThreadMutex_lock(&admin->subscriptionsLock);
+
+ topic_subscription_pt any_sub = hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
+
+ if(any_sub==NULL){
+
+ int i;
+ pubsub_serializer_service_t *best_serializer = NULL;
+ if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){
+ status = pubsub_topicSubscriptionCreate(admin->bundle_context, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, &any_sub);
+ }
+ else{
+ printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",subEP->topic);
+ celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+ arrayList_add(admin->noSerializerSubscriptions,subEP);
+ celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+ }
+
+ if (status == CELIX_SUCCESS){
+
+ /* Connect all internal publishers */
+ celixThreadMutex_lock(&admin->localPublicationsLock);
+ hash_map_iterator_pt lp_iter =hashMapIterator_create(admin->localPublications);
+ while(hashMapIterator_hasNext(lp_iter)){
+ service_factory_pt factory = (service_factory_pt)hashMapIterator_nextValue(lp_iter);
+ topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle;
+ array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs);
+
+ if(topic_publishers!=NULL){
+ for(i=0;i<arrayList_size(topic_publishers);i++){
+ pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
+ if(pubEP->endpoint !=NULL){
+ status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
+ }
+ }
+ arrayList_destroy(topic_publishers);
+ }
+ }
+ hashMapIterator_destroy(lp_iter);
+ celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+ /* Connect also all external publishers */
+ celixThreadMutex_lock(&admin->externalPublicationsLock);
+ hash_map_iterator_pt extp_iter =hashMapIterator_create(admin->externalPublications);
+ while(hashMapIterator_hasNext(extp_iter)){
+ array_list_pt ext_pub_list = (array_list_pt)hashMapIterator_nextValue(extp_iter);
+ if(ext_pub_list!=NULL){
+ for(i=0;i<arrayList_size(ext_pub_list);i++){
+ pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+ if(pubEP->endpoint !=NULL){
+ status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
+ }
+ }
+ }
+ }
+ hashMapIterator_destroy(extp_iter);
+ celixThreadMutex_unlock(&admin->externalPublicationsLock);
+
+
+ pubsub_topicSubscriptionAddSubscriber(any_sub,subEP);
+
+ status += pubsub_topicSubscriptionStart(any_sub);
+
+ }
+
+ if (status == CELIX_SUCCESS){
+ hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub);
+ connectTopicPubSubToSerializer(admin, best_serializer, any_sub, false);
+ }
+
+ }
+
+ celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+ return status;
+}
+
+celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
+ celix_status_t status = CELIX_SUCCESS;
+
+ printf("PSA_ZMQ: Received subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope, subEP->topic);
+
+ if(strcmp(subEP->topic,PUBSUB_ANY_SUB_TOPIC)==0){
+ return pubsubAdmin_addAnySubscription(admin,subEP);
+ }
+
+ /* Check if we already know some publisher about this topic, otherwise let's put the subscription in the pending hashmap */
+ celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
+ celixThreadMutex_lock(&admin->subscriptionsLock);
+ celixThreadMutex_lock(&admin->localPublicationsLock);
+ celixThreadMutex_lock(&admin->externalPublicationsLock);
+
+ char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
+
+ service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
+ array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
+
+ if(factory==NULL && ext_pub_list==NULL){ //No (local or external) publishers yet for this topic
+ pubsubAdmin_addSubscriptionToPendingList(admin,subEP);
+ }
+ else{
+ int i;
+ topic_subscription_pt subscription = hashMap_get(admin->subscriptions, scope_topic);
+
+ if(subscription == NULL) {
+ pubsub_serializer_service_t *best_serializer = NULL;
+ if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){
+ status += pubsub_topicSubscriptionCreate(admin->bundle_context,subEP->scope, subEP->topic, best_serializer, &subscription);
+ }
+ else{
+ printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",subEP->topic);
+ celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+ arrayList_add(admin->noSerializerSubscriptions,subEP);
+ celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+ }
+
+ if (status==CELIX_SUCCESS){
+
+ /* Try to connect internal publishers */
+ if(factory!=NULL){
+ topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle;
+ array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs);
+
+ if(topic_publishers!=NULL){
+ for(i=0;i<arrayList_size(topic_publishers);i++){
+ pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
+ if(pubEP->endpoint !=NULL){
+ status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
+ }
+ }
+ arrayList_destroy(topic_publishers);
+ }
+
+ }
+
+ /* Look also for external publishers */
+ if(ext_pub_list!=NULL){
+ for(i=0;i<arrayList_size(ext_pub_list);i++){
+ pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+ if(pubEP->endpoint !=NULL){
+ status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
+ }
+ }
+ }
+
+ pubsub_topicSubscriptionAddSubscriber(subscription,subEP);
+
+ status += pubsub_topicSubscriptionStart(subscription);
+
+ }
+
+ if(status==CELIX_SUCCESS){
+
+ hashMap_put(admin->subscriptions,strdup(scope_topic),subscription);
+
+ connectTopicPubSubToSerializer(admin, best_serializer, subscription, false);
+ }
+ }
+
+ if (status == CELIX_SUCCESS){
+ pubsub_topicIncreaseNrSubscribers(subscription);
+ }
+ }
+
+ free(scope_topic);
+ celixThreadMutex_unlock(&admin->externalPublicationsLock);
+ celixThreadMutex_unlock(&admin->localPublicationsLock);
+ celixThreadMutex_unlock(&admin->subscriptionsLock);
+ celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
+
+ return status;
+
+}
+
+celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
+ celix_status_t status = CELIX_SUCCESS;
+
+ printf("PSA_ZMQ: Removing subscription [FWUUID=%s bundleID=%ld topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->topic);
+
+ char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
+
+ celixThreadMutex_lock(&admin->subscriptionsLock);
+ topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
+ if(sub!=NULL){
+ pubsub_topicDecreaseNrSubscribers(sub);
+ if(pubsub_topicGetNrSubscribers(sub) == 0) {
+ status = pubsub_topicSubscriptionRemoveSubscriber(sub,subEP);
+ }
+ }
+ celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+ if(sub==NULL){
+ /* Maybe the endpoint was pending */
+ celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+ if(!arrayList_removeElement(admin->noSerializerSubscriptions, subEP)){
+ status = CELIX_ILLEGAL_STATE;
+ }
+ celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+ }
+
+ free(scope_topic);
+
+
+
+ return status;
+
+}
+
+celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ printf("PSA_ZMQ: Received publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n", pubEP->frameworkUUID, pubEP->serviceID, pubEP->scope, pubEP->topic);
+
+ const char* fwUUID = NULL;
+
+ bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
+ if (fwUUID == NULL) {
+ printf("PSA_ZMQ: Cannot retrieve fwUUID.\n");
+ return CELIX_INVALID_BUNDLE_CONTEXT;
+ }
+
+ char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
+
+ if ((strcmp(pubEP->frameworkUUID, fwUUID) == 0) && (pubEP->endpoint == NULL)) {
+
+ celixThreadMutex_lock(&admin->localPublicationsLock);
+
+ service_factory_pt factory = (service_factory_pt) hashMap_get(admin->localPublications, scope_topic);
+
+ if (factory == NULL) {
+ topic_publication_pt pub = NULL;
+ pubsub_serializer_service_t *best_serializer = NULL;
+ if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer)) == CELIX_SUCCESS){
+ status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, best_serializer, admin->ipAddress, admin->basePort, admin->maxPort, &pub);
+ }
+ else{
+ printf("PSA_ZMQ: Cannot find a serializer for publishing topic %s. Adding it to pending list.\n", pubEP->topic);
+ celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+ arrayList_add(admin->noSerializerPublications,pubEP);
+ celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+ }
+
+ if (status == CELIX_SUCCESS) {
+ status = pubsub_topicPublicationStart(admin->bundle_context, pub, &factory);
+ if (status == CELIX_SUCCESS && factory != NULL) {
+ hashMap_put(admin->localPublications, strdup(scope_topic), factory);
+ connectTopicPubSubToSerializer(admin, best_serializer, pub, true);
+ }
+ } else {
+ printf("PSA_ZMQ: Cannot create a topicPublication for scope=%s, topic=%s (bundle %ld).\n", pubEP->scope, pubEP->topic, pubEP->serviceID);
+ }
+ } else {
+ //just add the new EP to the list
+ topic_publication_pt pub = (topic_publication_pt) factory->handle;
+ pubsub_topicPublicationAddPublisherEP(pub, pubEP);
+ }
+
+ celixThreadMutex_unlock(&admin->localPublicationsLock);
+ }
+ else{
+
+ celixThreadMutex_lock(&admin->externalPublicationsLock);
+ array_list_pt ext_pub_list = (array_list_pt) hashMap_get(admin->externalPublications, scope_topic);
+ if (ext_pub_list == NULL) {
+ arrayList_create(&ext_pub_list);
+ hashMap_put(admin->externalPublications, strdup(scope_topic), ext_pub_list);
+ }
+
+ arrayList_add(ext_pub_list, pubEP);
+
+ celixThreadMutex_unlock(&admin->externalPublicationsLock);
+ }
+
+ /* Re-evaluate the pending subscriptions */
+ celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
+
+ hash_map_entry_pt pendingSub = hashMap_getEntry(admin->pendingSubscriptions, scope_topic);
+ if (pendingSub != NULL) { //There were pending subscription for the just published topic. Let's connect them.
+ char* topic = (char*) hashMapEntry_getKey(pendingSub);
+ array_list_pt pendingSubList = (array_list_pt) hashMapEntry_getValue(pendingSub);
+ int i;
+ for (i = 0; i < arrayList_size(pendingSubList); i++) {
+ pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) arrayList_get(pendingSubList, i);
+ pubsubAdmin_addSubscription(admin, subEP);
+ }
+ hashMap_remove(admin->pendingSubscriptions, scope_topic);
+ arrayList_clear(pendingSubList);
+ arrayList_destroy(pendingSubList);
+ free(topic);
+ }
+
+ celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
+
+ /* Connect the new publisher to the subscription for his topic, if there is any */
+ celixThreadMutex_lock(&admin->subscriptionsLock);
+
+ topic_subscription_pt sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, scope_topic);
+ if (sub != NULL && pubEP->endpoint != NULL) {
+ pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, pubEP->endpoint);
+ }
+
+ /* And check also for ANY subscription */
+ topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
+ if (any_sub != NULL && pubEP->endpoint != NULL) {
+ pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, pubEP->endpoint);
+ }
+
+ free(scope_topic);
+
+ celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+ return status;
+
+}
+
+celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){
+ celix_status_t status = CELIX_SUCCESS;
+ int count = 0;
+
+ printf("PSA_ZMQ: Removing publication [FWUUID=%s bundleID=%ld topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->topic);
+
+ const char* fwUUID = NULL;
+
+ bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+ if(fwUUID==NULL){
+ printf("PSA_ZMQ: Cannot retrieve fwUUID.\n");
+ return CELIX_INVALID_BUNDLE_CONTEXT;
+ }
+ char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
+
+ if(strcmp(pubEP->frameworkUUID,fwUUID)==0){
+
+ celixThreadMutex_lock(&admin->localPublicationsLock);
+ service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
+ if(factory!=NULL){
+ topic_publication_pt pub = (topic_publication_pt)factory->handle;
+ pubsub_topicPublicationRemovePublisherEP(pub,pubEP);
+ }
+ celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+ if(factory==NULL){
+ /* Maybe the endpoint was pending */
+ celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+ if(!arrayList_removeElement(admin->noSerializerPublications, pubEP)){
+ status = CELIX_ILLEGAL_STATE;
+ }
+ celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+ }
+ }
+ else{
+
+ celixThreadMutex_lock(&admin->externalPublicationsLock);
+ array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
+ if(ext_pub_list!=NULL){
+ int i;
+ bool found = false;
+ for(i=0;!found && i<arrayList_size(ext_pub_list);i++){
+ pubsub_endpoint_pt p = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+ found = pubsubEndpoint_equals(pubEP,p);
+ if (found){
+ arrayList_remove(ext_pub_list,i);
+ }
+ }
+ // Check if there are more publishers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic)
+ for(i=0; i<arrayList_size(ext_pub_list);i++) {
+ pubsub_endpoint_pt p = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+ if (strcmp(pubEP->endpoint,p->endpoint) == 0) {
+ count++;
+ }
+ }
+
+ if(arrayList_size(ext_pub_list)==0){
+ hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic);
+ char* topic = (char*)hashMapEntry_getKey(entry);
+ array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry);
+ hashMap_remove(admin->externalPublications,topic);
+ arrayList_destroy(list);
+ free(topic);
+ }
+ }
+
+ celixThreadMutex_unlock(&admin->externalPublicationsLock);
+ }
+
+ /* Check if this publisher was connected to one of our subscribers*/
+ celixThreadMutex_lock(&admin->subscriptionsLock);
+
+ topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
+ if(sub!=NULL && pubEP->endpoint!=NULL && count == 0){
+ pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint);
+ }
+
+ /* And check also for ANY subscription */
+ topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
+ if(any_sub!=NULL && pubEP->endpoint!=NULL && count == 0){
+ pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,pubEP->endpoint);
+ }
+
+ free(scope_topic);
+ celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+ return status;
+
+}
+
+celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin, char *scope, char* topic){
+ celix_status_t status = CELIX_SUCCESS;
+
+ printf("PSA_ZMQ: Closing all publications\n");
+
+ celixThreadMutex_lock(&admin->localPublicationsLock);
+ char *scope_topic = createScopeTopicKey(scope, topic);
+ hash_map_entry_pt pubsvc_entry = (hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic);
+ if(pubsvc_entry!=NULL){
+ char* key = (char*)hashMapEntry_getKey(pubsvc_entry);
+ service_factory_pt factory= (service_factory_pt)hashMapEntry_getValue(pubsvc_entry);
+ topic_publication_pt pub = (topic_publication_pt)factory->handle;
+ status += pubsub_topicPublicationStop(pub);
+ disconnectTopicPubSubFromSerializer(admin, pub, true);
+ status += pubsub_topicPublicationDestroy(pub);
+ hashMap_remove(admin->localPublications,scope_topic);
+ free(key);
+ free(factory);
+ }
+ free(scope_topic);
+ celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+ return status;
+
+}
+
+celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope,char* topic){
+ celix_status_t status = CELIX_SUCCESS;
+
+ printf("PSA_ZMQ: Closing all subscriptions\n");
+
+ celixThreadMutex_lock(&admin->subscriptionsLock);
+ char *scope_topic = createScopeTopicKey(scope, topic);
+ hash_map_entry_pt sub_entry = (hash_map_entry_pt)hashMap_getEntry(admin->subscriptions,scope_topic);
+ if(sub_entry!=NULL){
+ char* topic = (char*)hashMapEntry_getKey(sub_entry);
+
+ topic_subscription_pt ts = (topic_subscription_pt)hashMapEntry_getValue(sub_entry);
+ status += pubsub_topicSubscriptionStop(ts);
+ disconnectTopicPubSubFromSerializer(admin, ts, false);
+ status += pubsub_topicSubscriptionDestroy(ts);
+ hashMap_remove(admin->subscriptions,scope_topic);
+ free(topic);
+
+ }
+ free(scope_topic);
+ celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+ return status;
+
+}
+
+
+#ifndef ANDROID
+static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip) {
+ celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+
+ struct ifaddrs *ifaddr, *ifa;
+ char host[NI_MAXHOST];
+
+ if (getifaddrs(&ifaddr) != -1)
+ {
+ for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa = ifa->ifa_next)
+ {
+ if (ifa->ifa_addr == NULL)
+ continue;
+
+ if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) {
+ if (interface == NULL) {
+ *ip = strdup(host);
+ status = CELIX_SUCCESS;
+ }
+ else if (strcmp(ifa->ifa_name, interface) == 0) {
+ *ip = strdup(host);
+ status = CELIX_SUCCESS;
+ }
+ }
+ }
+
+ freeifaddrs(ifaddr);
+ }
+
+ return status;
+}
+#endif
+
+static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
+ celix_status_t status = CELIX_SUCCESS;
+ char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
+ array_list_pt pendingListPerTopic = hashMap_get(admin->pendingSubscriptions,scope_topic);
+ if(pendingListPerTopic==NULL){
+ arrayList_create(&pendingListPerTopic);
+ hashMap_put(admin->pendingSubscriptions,strdup(scope_topic),pendingListPerTopic);
+ }
+ arrayList_add(pendingListPerTopic,subEP);
+ free(scope_topic);
+ return status;
+}
+
+celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt reference, void * service){
+ /* Assumption: serializers are all available at startup.
+ * If a new (possibly better) serializer is installed and started, already created topic_publications/subscriptions will not be destroyed and recreated */
+
+ celix_status_t status = CELIX_SUCCESS;
+ int i=0;
+
+ const char *serType = NULL;
+ serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType);
+ if(serType == NULL){
+ printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference);
+ return CELIX_SERVICE_EXCEPTION;
+ }
+
+ pubsub_admin_pt admin = (pubsub_admin_pt)handle;
+ celixThreadMutex_lock(&admin->serializerListLock);
+ arrayList_add(admin->serializerList, reference);
+ celixThreadMutex_unlock(&admin->serializerListLock);
+
+ /* Now let's re-evaluate the pending */
+ celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+
+ for(i=0;i<arrayList_size(admin->noSerializerSubscriptions);i++){
+ pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i);
+ pubsub_serializer_service_t *best_serializer = NULL;
+ pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
+ if(best_serializer != NULL){ /* Finally we have a valid serializer! */
+ pubsubAdmin_addSubscription(admin, ep);
+ }
+ }
+
+ for(i=0;i<arrayList_size(admin->noSerializerPublications);i++){
+ pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i);
+ pubsub_serializer_service_t *best_serializer = NULL;
+ pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
+ if(best_serializer != NULL){ /* Finally we have a valid serializer! */
+ pubsubAdmin_addPublication(admin, ep);
+ }
+ }
+
+ celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+
+ printf("PSA_ZMQ: %s serializer added\n",serType);
+
+ return status;
+}
+
+celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service){
+
+ pubsub_admin_pt admin = (pubsub_admin_pt)handle;
+ int i=0, j=0;
+ const char *serType = NULL;
+
+ serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType);
+ if(serType == NULL){
+ printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference);
+ return CELIX_SERVICE_EXCEPTION;
+ }
+
+ celixThreadMutex_lock(&admin->serializerListLock);
+ /* Remove the serializer from the list */
+ arrayList_removeElement(admin->serializerList, reference);
+ celixThreadMutex_unlock(&admin->serializerListLock);
+
+
+ celixThreadMutex_lock(&admin->usedSerializersLock);
+ array_list_pt topicPubList = (array_list_pt)hashMap_remove(admin->topicPublicationsPerSerializer, service);
+ array_list_pt topicSubList = (array_list_pt)hashMap_remove(admin->topicSubscriptionsPerSerializer, service);
+ celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+ /* Now destroy the topicPublications, but first put back the pubsub_endpoints back to the noSerializer pending list */
+ if(topicPubList!=NULL){
+ for(i=0;i<arrayList_size(topicPubList);i++){
+ topic_publication_pt topicPub = (topic_publication_pt)arrayList_get(topicPubList,i);
+ /* Stop the topic publication */
+ pubsub_topicPublicationStop(topicPub);
+ /* Get the endpoints that are going to be orphan */
+ array_list_pt pubList = pubsub_topicPublicationGetPublisherList(topicPub);
+ for(j=0;j<arrayList_size(pubList);j++){
+ pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubList,j);
+ /* Remove the publication */
+ pubsubAdmin_removePublication(admin, pubEP);
+ /* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */
+ if(pubEP->endpoint!=NULL){
+ free(pubEP->endpoint);
+ pubEP->endpoint = NULL;
+ }
+ /* Add the orphan endpoint to the noSerializer pending list */
+ celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+ arrayList_add(admin->noSerializerPublications,pubEP);
+ celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+ }
+ arrayList_destroy(pubList);
+
+ /* Cleanup also the localPublications hashmap*/
+ celixThreadMutex_lock(&admin->localPublicationsLock);
+ hash_map_iterator_pt iter = hashMapIterator_create(admin->localPublications);
+ char *key = NULL;
+ service_factory_pt factory = NULL;
+ while(hashMapIterator_hasNext(iter)){
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+ factory = (service_factory_pt)hashMapEntry_getValue(entry);
+ topic_publication_pt pub = (topic_publication_pt)factory->handle;
+ if(pub==topicPub){
+ key = (char*)hashMapEntry_getKey(entry);
+ break;
+ }
+ }
+ hashMapIterator_destroy(iter);
+ if(key!=NULL){
+ hashMap_remove(admin->localPublications, key);
+ free(factory);
+ free(key);
+ }
+ celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+ /* Finally destroy the topicPublication */
+ pubsub_topicPublicationDestroy(topicPub);
+ }
+ arrayList_destroy(topicPubList);
+ }
+
+ /* Now destroy the topicSubscriptions, but first put back the pubsub_endpoints back to the noSerializer pending list */
+ if(topicSubList!=NULL){
+ for(i=0;i<arrayList_size(topicSubList);i++){
+ topic_subscription_pt topicSub = (topic_subscription_pt)arrayList_get(topicSubList,i);
+ /* Stop the topic subscription */
+ pubsub_topicSubscriptionStop(topicSub);
+ /* Get the endpoints that are going to be orphan */
+ array_list_pt subList = pubsub_topicSubscriptionGetSubscribersList(topicSub);
+ for(j=0;j<arrayList_size(subList);j++){
+ pubsub_endpoint_pt subEP = (pubsub_endpoint_pt)arrayList_get(subList,j);
+ /* Remove the subscription */
+ pubsubAdmin_removeSubscription(admin, subEP);
+ /* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */
+ if(subEP->endpoint!=NULL){
+ free(subEP->endpoint);
+ subEP->endpoint = NULL;
+ }
+ /* Add the orphan endpoint to the noSerializer pending list */
+ celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+ arrayList_add(admin->noSerializerSubscriptions,subEP);
+ celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+ }
+
+ /* Cleanup also the subscriptions hashmap*/
+ celixThreadMutex_lock(&admin->subscriptionsLock);
+ hash_map_iterator_pt iter = hashMapIterator_create(admin->subscriptions);
+ char *key = NULL;
+ while(hashMapIterator_hasNext(iter)){
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+ topic_subscription_pt sub = (topic_subscription_pt)hashMapEntry_getValue(entry);
+ if(sub==topicSub){
+ key = (char*)hashMapEntry_getKey(entry);
+ break;
+ }
+ }
+ hashMapIterator_destroy(iter);
+ if(key!=NULL){
+ hashMap_remove(admin->subscriptions, key);
+ free(key);
+ }
+ celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+ /* Finally destroy the topicSubscription */
+ pubsub_topicSubscriptionDestroy(topicSub);
+ }
+ arrayList_destroy(topicSubList);
+ }
+
+
+
+ printf("PSA_ZMQ: %s serializer removed\n",serType);
+
+
+ return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score){
+ celix_status_t status = CELIX_SUCCESS;
+
+ celixThreadMutex_lock(&admin->serializerListLock);
+ status = pubsub_admin_match(endpoint->topic_props,PUBSUB_ADMIN_TYPE,admin->serializerList,score);
+ celixThreadMutex_unlock(&admin->serializerListLock);
+
+ return status;
+}
+
+/* This one recall the same logic as in the match function */
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc){
+
+ celix_status_t status = CELIX_SUCCESS;
+
+ celixThreadMutex_lock(&admin->serializerListLock);
+ status = pubsub_admin_get_best_serializer(ep->topic_props, admin->serializerList, serSvc);
+ celixThreadMutex_unlock(&admin->serializerListLock);
+
+ return status;
+
+}
+
+static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication){
+
+ celixThreadMutex_lock(&admin->usedSerializersLock);
+
+ hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
+ array_list_pt list = (array_list_pt)hashMap_get(map,serializer);
+ if(list==NULL){
+ arrayList_create(&list);
+ hashMap_put(map,serializer,list);
+ }
+ arrayList_add(list,topicPubSub);
+
+ celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+}
+
+static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication){
+
+ celixThreadMutex_lock(&admin->usedSerializersLock);
+
+ hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
+ hash_map_iterator_pt iter = hashMapIterator_create(map);
+ while(hashMapIterator_hasNext(iter)){
+ array_list_pt list = (array_list_pt)hashMapIterator_nextValue(iter);
+ if(arrayList_removeElement(list, topicPubSub)){ //Found it!
+ break;
+ }
+ }
+ hashMapIterator_destroy(iter);
+
+ celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+}
http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h b/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h
new file mode 100644
index 0000000..03e8556
--- /dev/null
+++ b/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h
@@ -0,0 +1,109 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_admin_impl.h
+ *
+ * \date Dec 5, 2013
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_ADMIN_ZMQ_IMPL_H_
+#define PUBSUB_ADMIN_ZMQ_IMPL_H_
+
+#include <czmq.h>
+/* The following undefs prevent the collision between:
+ * - sys/syslog.h (which is included within czmq)
+ * - celix/dfi/dfi_log_util.h
+ */
+#undef LOG_DEBUG
+#undef LOG_WARNING
+#undef LOG_INFO
+#undef LOG_WARNING
+
+#include "pubsub_admin.h"
+#include "pubsub_admin_match.h"
+#include "log_helper.h"
+
+#define PSA_ZMQ_BASE_PORT "PSA_ZMQ_BASE_PORT"
+#define PSA_ZMQ_MAX_PORT "PSA_ZMQ_MAX_PORT"
+
+#define PSA_ZMQ_DEFAULT_BASE_PORT 5501
+#define PSA_ZMQ_DEFAULT_MAX_PORT 6000
+
+#define PUBSUB_ADMIN_TYPE "zmq"
+
+struct pubsub_admin {
+
+ bundle_context_pt bundle_context;
+ log_helper_pt loghelper;
+
+ /* List of the available serializers */
+ celix_thread_mutex_t serializerListLock; // List<serializers>
+ array_list_pt serializerList;
+
+ celix_thread_mutex_t localPublicationsLock;
+ hash_map_pt localPublications;//<topic(string),service_factory_pt>
+
+ celix_thread_mutex_t externalPublicationsLock;
+ hash_map_pt externalPublications;//<topic(string),List<pubsub_ep>>
+
+ celix_thread_mutex_t subscriptionsLock;
+ hash_map_pt subscriptions; //<topic(string),topic_subscription>
+
+ celix_thread_mutex_t pendingSubscriptionsLock;
+ celix_thread_mutexattr_t pendingSubscriptionsAttr;
+ hash_map_pt pendingSubscriptions; //<topic(string),List<pubsub_ep>>
+
+ /* Those are used to keep track of valid subscriptions/publications that still have no valid serializer */
+ celix_thread_mutex_t noSerializerPendingsLock;
+ celix_thread_mutexattr_t noSerializerPendingsAttr;
+ array_list_pt noSerializerSubscriptions; // List<pubsub_ep>
+ array_list_pt noSerializerPublications; // List<pubsub_ep>
+
+ celix_thread_mutex_t usedSerializersLock;
+ hash_map_pt topicSubscriptionsPerSerializer; // <serializer,List<topicSubscription>>
+ hash_map_pt topicPublicationsPerSerializer; // <serializer,List<topicPublications>>
+
+ char* ipAddress;
+
+ zactor_t* zmq_auth;
+
+ unsigned int basePort;
+ unsigned int maxPort;
+};
+
+celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin);
+celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin);
+
+celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+
+celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP);
+celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP);
+
+celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char* scope, char* topic);
+celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope,char* topic);
+
+celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt reference, void * service);
+celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service);
+
+celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score);
+
+#endif /* PUBSUB_ADMIN_ZMQ_IMPL_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_admin_zmq/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/src/topic_publication.c b/pubsub/pubsub_admin_zmq/src/topic_publication.c
new file mode 100644
index 0000000..e405866
--- /dev/null
+++ b/pubsub/pubsub_admin_zmq/src/topic_publication.c
@@ -0,0 +1,630 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <czmq.h>
+/* The following undefs prevent the collision between:
+ * - sys/syslog.h (which is included within czmq)
+ * - celix/dfi/dfi_log_util.h
+ */
+#undef LOG_DEBUG
+#undef LOG_WARNING
+#undef LOG_INFO
+#undef LOG_WARNING
+
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "array_list.h"
+#include "celixbool.h"
+#include "service_registration.h"
+#include "utils.h"
+#include "service_factory.h"
+#include "version.h"
+
+#include "pubsub_common.h"
+#include "pubsub_utils.h"
+#include "publisher.h"
+
+#include "topic_publication.h"
+
+#include "pubsub_serializer.h"
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+ #include "zmq_crypto.h"
+
+ #define MAX_CERT_PATH_LENGTH 512
+#endif
+
+#define EP_ADDRESS_LEN 32
+#define ZMQ_BIND_MAX_RETRY 5
+
+#define FIRST_SEND_DELAY 2
+
+struct topic_publication {
+ zsock_t* zmq_socket;
+ celix_thread_mutex_t socket_lock; //Protects zmq_socket access
+ zcert_t * zmq_cert;
+ char* endpoint;
+ service_registration_pt svcFactoryReg;
+ array_list_pt pub_ep_list; //List<pubsub_endpoint>
+ hash_map_pt boundServices; //<bundle_pt,bound_service>
+ pubsub_serializer_service_t *serializer;
+ celix_thread_mutex_t tp_lock;
+};
+
+typedef struct publish_bundle_bound_service {
+ topic_publication_pt parent;
+ pubsub_publisher_t service;
+ bundle_pt bundle;
+ char *topic;
+ hash_map_pt msgTypes;
+ unsigned short getCount;
+ celix_thread_mutex_t mp_lock; //Protects publish_bundle_bound_service data structure
+ bool mp_send_in_progress;
+ array_list_pt mp_parts;
+}* publish_bundle_bound_service_pt;
+
+/* Note: correct locking order is
+ * 1. tp_lock
+ * 2. mp_lock
+ * 3. socket_lock
+ *
+ * tp_lock and socket_lock are independent.
+ */
+
+typedef struct pubsub_msg{
+ pubsub_msg_header_pt header;
+ char* payload;
+ int payloadSize;
+}* pubsub_msg_pt;
+
+static unsigned int rand_range(unsigned int min, unsigned int max);
+
+static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
+static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
+
+static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle);
+static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc);
+
+static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, const void *msg);
+static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *inMsg, int flags);
+static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId);
+
+static void delay_first_send_for_late_joiners(void);
+
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
+ celix_status_t status = CELIX_SUCCESS;
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+ char* secure_topics = NULL;
+ bundleContext_getProperty(bundle_context, "SECURE_TOPICS", (const char **) &secure_topics);
+
+ if (secure_topics){
+ array_list_pt secure_topics_list = pubsub_getTopicsFromString(secure_topics);
+
+ int i;
+ int secure_topics_size = arrayList_size(secure_topics_list);
+ for (i = 0; i < secure_topics_size; i++){
+ char* top = arrayList_get(secure_topics_list, i);
+ if (strcmp(pubEP->topic, top) == 0){
+ printf("PSA_ZMQ_TP: Secure topic: '%s'\n", top);
+ pubEP->is_secure = true;
+ }
+ free(top);
+ top = NULL;
+ }
+
+ arrayList_destroy(secure_topics_list);
+ }
+
+ zcert_t* pub_cert = NULL;
+ if (pubEP->is_secure){
+ char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
+ if (keys_bundle_dir == NULL){
+ return CELIX_SERVICE_EXCEPTION;
+ }
+
+ const char* keys_file_path = NULL;
+ const char* keys_file_name = NULL;
+ bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, &keys_file_path);
+ bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, &keys_file_name);
+
+ char cert_path[MAX_CERT_PATH_LENGTH];
+
+ //certificate path ".cache/bundle{id}/version0.0/./META-INF/keys/publisher/private/pub_{topic}.key"
+ snprintf(cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/publisher/private/pub_%s.key.enc", keys_bundle_dir, pubEP->topic);
+ free(keys_bundle_dir);
+ printf("PSA_ZMQ_TP: Loading key '%s'\n", cert_path);
+
+ pub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, cert_path);
+ if (pub_cert == NULL){
+ printf("PSA_ZMQ_TP: Cannot load key '%s'\n", cert_path);
+ printf("PSA_ZMQ_TP: Topic '%s' NOT SECURED !\n", pubEP->topic);
+ pubEP->is_secure = false;
+ }
+ }
+#endif
+
+ zsock_t* socket = zsock_new (ZMQ_PUB);
+ if(socket==NULL){
+ #ifdef BUILD_WITH_ZMQ_SECURITY
+ if (pubEP->is_secure){
+ zcert_destroy(&pub_cert);
+ }
+ #endif
+
+ perror("Error for zmq_socket");
+ return CELIX_SERVICE_EXCEPTION;
+ }
+#ifdef BUILD_WITH_ZMQ_SECURITY
+ if (pubEP->is_secure){
+ zcert_apply (pub_cert, socket); // apply certificate to socket
+ zsock_set_curve_server (socket, true); // setup the publisher's socket to use the curve functions
+ }
+#endif
+
+ int rv = -1, retry=0;
+ char* ep = malloc(EP_ADDRESS_LEN);
+ char bindAddress[EP_ADDRESS_LEN];
+
+ while(rv==-1 && retry<ZMQ_BIND_MAX_RETRY){
+ /* Randomized part due to same bundle publishing on different topics */
+ unsigned int port = rand_range(basePort,maxPort);
+ memset(ep,0,EP_ADDRESS_LEN);
+ memset(bindAddress, 0, EP_ADDRESS_LEN);
+
+ snprintf(ep,EP_ADDRESS_LEN,"tcp://%s:%u",bindIP,port);
+ snprintf(bindAddress, EP_ADDRESS_LEN, "tcp://0.0.0.0:%u", port); //NOTE using a different bind addres than endpoint address
+ rv = zsock_bind (socket, "%s", bindAddress);
+ if (rv == -1) {
+ perror("Error for zmq_bind");
+ }
+ retry++;
+ }
+
+ if(rv == -1){
+ free(ep);
+ return CELIX_SERVICE_EXCEPTION;
+ }
+
+ /* ZMQ stuffs are all fine at this point. Let's create and initialize the structure */
+
+ topic_publication_pt pub = calloc(1,sizeof(*pub));
+
+ arrayList_create(&(pub->pub_ep_list));
+ pub->boundServices = hashMap_create(NULL,NULL,NULL,NULL);
+ celixThreadMutex_create(&(pub->tp_lock),NULL);
+
+ pub->endpoint = ep;
+ pub->zmq_socket = socket;
+ pub->serializer = best_serializer;
+
+ celixThreadMutex_create(&(pub->socket_lock),NULL);
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+ if (pubEP->is_secure){
+ pub->zmq_cert = pub_cert;
+ }
+#endif
+
+ pubsub_topicPublicationAddPublisherEP(pub,pubEP);
+
+ *out = pub;
+
+ return status;
+}
+
+celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
+ celix_status_t status = CELIX_SUCCESS;
+
+ celixThreadMutex_lock(&(pub->tp_lock));
+
+ free(pub->endpoint);
+ arrayList_destroy(pub->pub_ep_list);
+
+ hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
+ while(hashMapIterator_hasNext(iter)){
+ publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter);
+ pubsub_destroyPublishBundleBoundService(bound);
+ }
+ hashMapIterator_destroy(iter);
+ hashMap_destroy(pub->boundServices,false,false);
+
+ pub->svcFactoryReg = NULL;
+ pub->serializer = NULL;
+#ifdef BUILD_WITH_ZMQ_SECURITY
+ zcert_destroy(&(pub->zmq_cert));
+#endif
+
+ celixThreadMutex_unlock(&(pub->tp_lock));
+
+ celixThreadMutex_destroy(&(pub->tp_lock));
+
+ celixThreadMutex_lock(&(pub->socket_lock));
+ zsock_destroy(&(pub->zmq_socket));
+ celixThreadMutex_unlock(&(pub->socket_lock));
+
+ celixThreadMutex_destroy(&(pub->socket_lock));
+
+ free(pub);
+
+ return status;
+}
+
+celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory){
+ celix_status_t status = CELIX_SUCCESS;
+
+ /* Let's register the new service */
+
+ pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0);
+
+ if(pubEP!=NULL){
+ service_factory_pt factory = calloc(1, sizeof(*factory));
+ factory->handle = pub;
+ factory->getService = pubsub_topicPublicationGetService;
+ factory->ungetService = pubsub_topicPublicationUngetService;
+
+ properties_pt props = properties_create();
+ properties_set(props,PUBSUB_PUBLISHER_TOPIC,pubEP->topic);
+ properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope);
+ properties_set(props,"service.version", PUBSUB_PUBLISHER_SERVICE_VERSION);
+
+ status = bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg));
+
+ if(status != CELIX_SUCCESS){
+ properties_destroy(props);
+ printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot register ServiceFactory for topic %s (bundle %ld).\n",pubEP->topic,pubEP->serviceID);
+ }
+ else{
+ *svcFactory = factory;
+ }
+ }
+ else{
+ printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot find pubsub_endpoint after adding it...Should never happen!\n");
+ status = CELIX_SERVICE_EXCEPTION;
+ }
+
+ return status;
+}
+
+celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
+ return serviceRegistration_unregister(pub->svcFactoryReg);
+}
+
+celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){
+
+ celixThreadMutex_lock(&(pub->tp_lock));
+ ep->endpoint = strdup(pub->endpoint);
+ arrayList_add(pub->pub_ep_list,ep);
+ celixThreadMutex_unlock(&(pub->tp_lock));
+
+ return CELIX_SUCCESS;
+}
+
+celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){
+
+ celixThreadMutex_lock(&(pub->tp_lock));
+ for (int i = 0; i < arrayList_size(pub->pub_ep_list); i++) {
+ pubsub_endpoint_pt e = arrayList_get(pub->pub_ep_list, i);
+ if(pubsubEndpoint_equals(ep, e)) {
+ arrayList_removeElement(pub->pub_ep_list,ep);
+ break;
+ }
+ }
+ celixThreadMutex_unlock(&(pub->tp_lock));
+
+ return CELIX_SUCCESS;
+}
+
+array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub){
+ array_list_pt list = NULL;
+ celixThreadMutex_lock(&(pub->tp_lock));
+ list = arrayList_clone(pub->pub_ep_list);
+ celixThreadMutex_unlock(&(pub->tp_lock));
+ return list;
+}
+
+
+static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ topic_publication_pt publish = (topic_publication_pt)handle;
+
+ celixThreadMutex_lock(&(publish->tp_lock));
+
+ publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
+ if(bound==NULL){
+ bound = pubsub_createPublishBundleBoundService(publish,bundle);
+ if(bound!=NULL){
+ hashMap_put(publish->boundServices,bundle,bound);
+ }
+ }
+ else{
+ bound->getCount++;
+ }
+
+ *service = &bound->service;
+
+ celixThreadMutex_unlock(&(publish->tp_lock));
+
+ return status;
+}
+
+static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service) {
+
+ topic_publication_pt publish = (topic_publication_pt)handle;
+
+ celixThreadMutex_lock(&(publish->tp_lock));
+
+ publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
+ if(bound!=NULL){
+
+ bound->getCount--;
+ if(bound->getCount==0){
+ pubsub_destroyPublishBundleBoundService(bound);
+ hashMap_remove(publish->boundServices,bundle);
+ }
+
+ }
+ else{
+ long bundleId = -1;
+ bundle_getBundleId(bundle,&bundleId);
+ printf("PSA_ZMQ_TP: Unexpected ungetService call for bundle %ld.\n", bundleId);
+ }
+
+ /* service should be never used for unget, so let's set the pointer to NULL */
+ *service = NULL;
+
+ celixThreadMutex_unlock(&(publish->tp_lock));
+
+ return CELIX_SUCCESS;
+}
+
+static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_pt msg, bool last){
+
+ bool ret = true;
+
+ zframe_t* headerMsg = zframe_new(msg->header, sizeof(struct pubsub_msg_header));
+ if (headerMsg == NULL) ret=false;
+ zframe_t* payloadMsg = zframe_new(msg->payload, msg->payloadSize);
+ if (payloadMsg == NULL) ret=false;
+
+ delay_first_send_for_late_joiners();
+
+ if( zframe_send(&headerMsg,zmq_socket, ZFRAME_MORE) == -1) ret=false;
+
+ if(!last){
+ if( zframe_send(&payloadMsg,zmq_socket, ZFRAME_MORE) == -1) ret=false;
+ }
+ else{
+ if( zframe_send(&payloadMsg,zmq_socket, 0) == -1) ret=false;
+ }
+
+ if (!ret){
+ zframe_destroy(&headerMsg);
+ zframe_destroy(&payloadMsg);
+ }
+
+ free(msg->header);
+ free(msg->payload);
+ free(msg);
+
+ return ret;
+
+}
+
+static bool send_pubsub_mp_msg(zsock_t* zmq_socket, array_list_pt mp_msg_parts){
+
+ bool ret = true;
+
+ unsigned int i = 0;
+ unsigned int mp_num = arrayList_size(mp_msg_parts);
+ for(;i<mp_num;i++){
+ ret = ret && send_pubsub_msg(zmq_socket, (pubsub_msg_pt)arrayList_get(mp_msg_parts,i), (i==mp_num-1));
+ }
+ arrayList_clear(mp_msg_parts);
+
+ return ret;
+
+}
+
+static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg) {
+
+ return pubsub_topicPublicationSendMultipart(handle,msgTypeId,msg, PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG);
+
+}
+
+static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *inMsg, int flags){
+
+ int status = 0;
+
+ publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle;
+
+ celixThreadMutex_lock(&(bound->parent->tp_lock));
+ celixThreadMutex_lock(&(bound->mp_lock));
+ if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & PUBSUB_PUBLISHER_LAST_MSG) && bound->mp_send_in_progress){ //means a real mp_msg
+ printf("PSA_ZMQ_TP: Multipart send already in progress. Cannot process a new one.\n");
+ celixThreadMutex_unlock(&(bound->mp_lock));
+ celixThreadMutex_unlock(&(bound->parent->tp_lock));
+ return -3;
+ }
+
+ pubsub_msg_serializer_t* msgSer = (pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes, (void*)(uintptr_t)msgTypeId);
+
+ if (msgSer!= NULL) {
+ int major=0, minor=0;
+
+ pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
+ strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
+ msg_hdr->type = msgTypeId;
+
+ if (msgSer->msgVersion != NULL){
+ version_getMajor(msgSer->msgVersion, &major);
+ version_getMinor(msgSer->msgVersion, &minor);
+ msg_hdr->major = major;
+ msg_hdr->minor = minor;
+ }
+
+ void *serializedOutput = NULL;
+ size_t serializedOutputLen = 0;
+ msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen);
+
+ pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg));
+ msg->header = msg_hdr;
+ msg->payload = (char*)serializedOutput;
+ msg->payloadSize = serializedOutputLen;
+ bool snd = true;
+
+ switch(flags){
+ case PUBSUB_PUBLISHER_FIRST_MSG:
+ bound->mp_send_in_progress = true;
+ arrayList_add(bound->mp_parts,msg);
+ break;
+ case PUBSUB_PUBLISHER_PART_MSG:
+ if(!bound->mp_send_in_progress){
+ printf("PSA_ZMQ_TP: ERROR: received msg part without the first part.\n");
+ status = -4;
+ }
+ else{
+ arrayList_add(bound->mp_parts,msg);
+ }
+ break;
+ case PUBSUB_PUBLISHER_LAST_MSG:
+ if(!bound->mp_send_in_progress){
+ printf("PSA_ZMQ_TP: ERROR: received end msg without the first part.\n");
+ status = -4;
+ }
+ else{
+ arrayList_add(bound->mp_parts,msg);
+ snd = send_pubsub_mp_msg(bound->parent->zmq_socket,bound->mp_parts);
+ bound->mp_send_in_progress = false;
+ }
+ break;
+ case PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG: //Normal send case
+ snd = send_pubsub_msg(bound->parent->zmq_socket,msg,true);
+ break;
+ default:
+ printf("PSA_ZMQ_TP: ERROR: Invalid MP flags combination\n");
+ status = -4;
+ break;
+ }
+
+ if(status==-4){
+ free(msg);
+ }
+
+ if(!snd){
+ printf("PSA_ZMQ_TP: Failed to send %s message %u.\n",flags == (PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG) ? "single" : "multipart", msgTypeId);
+ }
+
+ } else {
+ printf("PSA_ZMQ_TP: No msg serializer available for msg type id %d\n", msgTypeId);
+ status=-1;
+ }
+
+ celixThreadMutex_unlock(&(bound->mp_lock));
+ celixThreadMutex_unlock(&(bound->parent->tp_lock));
+
+ return status;
+
+}
+
+static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){
+ *msgTypeId = utils_stringHash(msgType);
+ return 0;
+}
+
+
+static unsigned int rand_range(unsigned int min, unsigned int max){
+
+ double scaled = (double)(((double)random())/((double)RAND_MAX));
+ return (max-min+1)*scaled + min;
+
+}
+
+static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){
+
+ //PRECOND lock on tp->lock
+
+ publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
+
+ if (bound != NULL) {
+
+ bound->parent = tp;
+ bound->bundle = bundle;
+ bound->getCount = 1;
+ bound->mp_send_in_progress = false;
+ celixThreadMutex_create(&bound->mp_lock,NULL);
+
+ if(tp->serializer != NULL){
+ tp->serializer->createSerializerMap(tp->serializer->handle,bundle,&bound->msgTypes);
+ }
+
+ arrayList_create(&bound->mp_parts);
+
+ pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
+ bound->topic=strdup(pubEP->topic);
+
+ bound->service.handle = bound;
+ bound->service.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID;
+ bound->service.send = pubsub_topicPublicationSend;
+ bound->service.sendMultipart = pubsub_topicPublicationSendMultipart;
+
+ }
+
+ return bound;
+}
+
+static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){
+
+ //PRECOND lock on tp->lock
+
+ celixThreadMutex_lock(&boundSvc->mp_lock);
+
+
+ if(boundSvc->parent->serializer != NULL && boundSvc->msgTypes != NULL){
+ boundSvc->parent->serializer->destroySerializerMap(boundSvc->parent->serializer->handle, boundSvc->msgTypes);
+ }
+
+ if(boundSvc->mp_parts!=NULL){
+ arrayList_destroy(boundSvc->mp_parts);
+ }
+
+ if(boundSvc->topic!=NULL){
+ free(boundSvc->topic);
+ }
+
+ celixThreadMutex_unlock(&boundSvc->mp_lock);
+ celixThreadMutex_destroy(&boundSvc->mp_lock);
+
+ free(boundSvc);
+
+}
+
+static void delay_first_send_for_late_joiners(){
+
+ static bool firstSend = true;
+
+ if(firstSend){
+ printf("PSA_ZMQ_TP: Delaying first send for late joiners...\n");
+ sleep(FIRST_SEND_DELAY);
+ firstSend = false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_admin_zmq/src/topic_publication.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/src/topic_publication.h b/pubsub/pubsub_admin_zmq/src/topic_publication.h
new file mode 100644
index 0000000..3457263
--- /dev/null
+++ b/pubsub/pubsub_admin_zmq/src/topic_publication.h
@@ -0,0 +1,49 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * topic_publication.h
+ *
+ * \date Sep 24, 2015
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#ifndef TOPIC_PUBLICATION_H_
+#define TOPIC_PUBLICATION_H_
+
+#include "publisher.h"
+#include "pubsub_endpoint.h"
+#include "pubsub_common.h"
+
+#include "pubsub_serializer.h"
+
+typedef struct topic_publication *topic_publication_pt;
+
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
+
+celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
+celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
+
+celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
+celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);
+
+array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub);
+
+#endif /* TOPIC_PUBLICATION_H_ */