You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rl...@apache.org on 2019/07/17 17:33:43 UTC
[celix] 03/03: Using celix_properties_t* in pubsub and updated
formatting
This is an automated email from the ASF dual-hosted git repository.
rlenferink pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git
commit 163735a3c0856e11786e61d0d25042315b3d95b6
Author: Roy Lenferink <le...@gmail.com>
AuthorDate: Wed Jul 17 19:23:59 2019 +0200
Using celix_properties_t* in pubsub and updated formatting
---
.../pubsub/examples/pubsub/common/include/poi.h | 28 +-
.../examples/pubsub/publisher/CMakeLists.txt | 16 +-
.../private/include/pubsub_publisher_private.h | 14 +-
.../publisher/private/src/pubsub_publisher.c | 171 ++++----
.../examples/pubsub/publisher2/CMakeLists.txt | 16 +-
.../examples/pubsub/subscriber/CMakeLists.txt | 16 +-
.../private/include/pubsub_subscriber_private.h | 6 +-
.../subscriber/private/src/pubsub_subscriber.c | 38 +-
bundles/pubsub/keygen/CMakeLists.txt | 26 +-
bundles/pubsub/keygen/ed_file.c | 450 ++++++++++-----------
bundles/pubsub/keygen/makecert.c | 41 +-
bundles/pubsub/mock/tst/pubsubmock_test.cc | 8 +-
bundles/pubsub/pubsub_admin_nanomsg/CMakeLists.txt | 56 +--
.../src/pubsub_nanomsg_admin.cc | 2 +-
.../src/pubsub_nanomsg_admin.h | 2 +-
.../src/pubsub_nanomsg_common.cc | 2 +-
.../src/pubsub_nanomsg_topic_receiver.cc | 4 +-
.../src/pubsub_nanomsg_topic_sender.cc | 6 +-
.../src/pubsub_psa_nanomsg_constants.h | 12 +-
bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt | 20 +-
bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt | 86 ++--
.../pubsub/pubsub_admin_zmq/src/psa_activator.c | 160 ++++----
.../src/pubsub_psa_zmq_constants.h | 16 +-
.../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c | 2 +-
.../pubsub_admin_zmq/src/pubsub_zmq_common.c | 2 +-
.../src/pubsub_zmq_topic_receiver.c | 12 +-
.../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 30 +-
bundles/pubsub/pubsub_admin_zmq/src/zmq_crypto.c | 361 ++++++++---------
bundles/pubsub/pubsub_admin_zmq/src/zmq_crypto.h | 4 +-
.../pubsub/pubsub_api/include/pubsub/publisher.h | 8 +-
.../pubsub/pubsub_api/include/pubsub/subscriber.h | 4 +-
bundles/pubsub/pubsub_discovery/CMakeLists.txt | 12 +-
.../pubsub/pubsub_discovery/src/psd_activator.c | 110 ++---
.../pubsub_discovery/src/pubsub_discovery_impl.c | 2 +-
.../pubsub_discovery/src/pubsub_discovery_impl.h | 60 +--
.../pubsub/pubsub_serializer_json/CMakeLists.txt | 8 +-
.../src/ps_json_serializer_activator.c | 38 +-
.../src/pubsub_serializer_impl.c | 439 ++++++++++----------
.../src/pubsub_serializer_impl.h | 2 +-
bundles/pubsub/pubsub_spi/CMakeLists.txt | 12 +-
bundles/pubsub/pubsub_spi/include/pubsub_admin.h | 40 +-
.../pubsub_spi/include/pubsub_admin_metrics.h | 96 ++---
.../pubsub/pubsub_spi/include/pubsub_constants.h | 2 +-
.../pubsub/pubsub_spi/include/pubsub_listeners.h | 12 +-
.../pubsub/pubsub_spi/include/pubsub_serializer.h | 26 +-
bundles/pubsub/pubsub_spi/include/pubsub_utils.h | 4 +-
bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c | 216 +++++-----
bundles/pubsub/pubsub_spi/src/pubsub_utils.c | 192 ++++-----
bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c | 346 ++++++++--------
.../pubsub/pubsub_topology_manager/CMakeLists.txt | 16 +-
.../pubsub_topology_manager/src/pstm_activator.c | 232 +++++------
.../src/pubsub_topology_manager.c | 6 +-
.../src/pubsub_topology_manager.h | 130 +++---
bundles/pubsub/test/test/sut_activator.c | 96 ++---
bundles/pubsub/test/test/tst_activator.cc | 12 +-
55 files changed, 1859 insertions(+), 1869 deletions(-)
diff --git a/bundles/pubsub/examples/pubsub/common/include/poi.h b/bundles/pubsub/examples/pubsub/common/include/poi.h
index c98007a..de2f196 100644
--- a/bundles/pubsub/examples/pubsub/common/include/poi.h
+++ b/bundles/pubsub/examples/pubsub/common/include/poi.h
@@ -20,33 +20,33 @@
* poi.h
*
* \date Nov 12, 2015
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
*/
#ifndef POI_H_
#define POI_H_
-#define MIN_LAT -90.0F
-#define MAX_LAT 90.0F
-#define MIN_LON -180.0F
-#define MAX_LON 180.0F
+#define MIN_LAT -90.0F
+#define MAX_LAT 90.0F
+#define MIN_LON -180.0F
+#define MAX_LON 180.0F
-#define MSG_POI_NAME "poi" //Has to match the message name in the msg descriptor!
+#define MSG_POI_NAME "poi" //Has to match the message name in the msg descriptor!
struct poi{
- double lat;
- double lon;
+ double lat;
+ double lon;
};
typedef struct poi1 poi1_t;
struct location{
- struct poi position;
- char* name;
- char* description;
- char* extra;
- char* data;
+ struct poi position;
+ char* name;
+ char* description;
+ char* extra;
+ char* data;
};
typedef struct location* location_t;
diff --git a/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt b/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt
index 00b528c..c68d533 100644
--- a/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt
@@ -19,31 +19,31 @@ add_celix_bundle(celix_pubsub_poi_publisher
SYMBOLIC_NAME "apache_celix_pubsub_poi_publisher"
VERSION "1.0.0"
SOURCES
- private/src/ps_pub_activator.c
- private/src/pubsub_publisher.c
+ private/src/ps_pub_activator.c
+ private/src/pubsub_publisher.c
)
target_link_libraries(celix_pubsub_poi_publisher PRIVATE Celix::framework Celix::pubsub_api)
target_include_directories(celix_pubsub_poi_publisher PRIVATE private/include)
celix_bundle_files(celix_pubsub_poi_publisher
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
+ ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
+ ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
DESTINATION "META-INF/descriptors"
)
celix_bundle_files(celix_pubsub_poi_publisher
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poi1.properties
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poi2.properties
+ ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poi1.properties
+ ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poi2.properties
DESTINATION "META-INF/topics/pub"
)
celix_bundle_files(celix_pubsub_poi_publisher
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/publisher
+ ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/publisher
DESTINATION "META-INF/keys"
)
celix_bundle_files(celix_pubsub_poi_publisher
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/subscriber/public
+ ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/subscriber/public
DESTINATION "META-INF/keys/subscriber"
)
diff --git a/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h b/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h
index d12ad3a..a124a9b 100644
--- a/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h
+++ b/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h
@@ -25,18 +25,18 @@
#include "pubsub/publisher.h"
struct pubsub_sender {
- array_list_pt trackers;
- const char *ident;
- hash_map_pt tid_map; //service -> tid
- long bundleId;
+ array_list_pt trackers;
+ const char *ident;
+ hash_map_pt tid_map; //service -> tid
+ long bundleId;
};
typedef struct pubsub_sender * pubsub_sender_pt;
typedef struct send_thread_struct{
- pubsub_publisher_pt service;
- pubsub_sender_pt publisher;
- const char *topic;
+ pubsub_publisher_pt service;
+ pubsub_sender_pt publisher;
+ const char *topic;
} *send_thread_struct_pt;
pubsub_sender_pt publisher_create(array_list_pt trackers, const char* ident,long bundleId);
diff --git a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
index 8c685e7..de6f21b 100644
--- a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
+++ b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
@@ -20,8 +20,8 @@
* pubsub_publisher.c
*
* \date Sep 21, 2010
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
*/
#include <stdlib.h>
@@ -38,132 +38,131 @@
static bool stop=false;
-static double randCoordinate(double min, double max){
+static double randCoordinate(double min, double max) {
- double ret = min + (((double)random()) / (((double)RAND_MAX)/(max-min))) ;
+ double ret = min + (((double)random()) / (((double)RAND_MAX)/(max-min))) ;
- return ret;
+ return ret;
}
-static void* send_thread(void* arg){
+static void* send_thread(void* arg) {
- send_thread_struct_pt st_struct = (send_thread_struct_pt)arg;
+ send_thread_struct_pt st_struct = (send_thread_struct_pt)arg;
- pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)st_struct->service;
- pubsub_sender_pt publisher = (pubsub_sender_pt)st_struct->publisher;
+ pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)st_struct->service;
+ pubsub_sender_pt publisher = (pubsub_sender_pt)st_struct->publisher;
- char fwUUID[9];
- memset(fwUUID,0,9);
- memcpy(fwUUID,publisher->ident,8);
+ char fwUUID[9];
+ memset(fwUUID,0,9);
+ memcpy(fwUUID,publisher->ident,8);
- //poi_t point = calloc(1,sizeof(*point));
- location_t place = calloc(1,sizeof(*place));
+ //poi_t point = calloc(1,sizeof(*point));
+ location_t place = calloc(1,sizeof(*place));
- char* desc = calloc(64,sizeof(char));
- snprintf(desc,64,"fw-%s [TID=%lu]", fwUUID, (unsigned long)pthread_self());
+ char* desc = calloc(64,sizeof(char));
+ snprintf(desc,64,"fw-%s [TID=%lu]", fwUUID, (unsigned long)pthread_self());
- char* name = calloc(64,sizeof(char));
- snprintf(name,64,"Bundle#%ld",publisher->bundleId);
+ char* name = calloc(64,sizeof(char));
+ snprintf(name,64,"Bundle#%ld",publisher->bundleId);
- place->name = name;
- place->description = desc;
- place->extra = "extra value";
- printf("TOPIC : %s\n",st_struct->topic);
- unsigned int msgId = 0;
- if( publish_svc->localMsgTypeIdForMsgType(publish_svc->handle,st_struct->topic,&msgId) == 0 ){
+ place->name = name;
+ place->description = desc;
+ place->extra = "extra value";
+ printf("TOPIC : %s\n",st_struct->topic);
+ unsigned int msgId = 0;
+ if (publish_svc->localMsgTypeIdForMsgType(publish_svc->handle,st_struct->topic,&msgId) == 0) {
- while(stop==false){
- place->position.lat = randCoordinate(MIN_LAT,MAX_LAT);
- place->position.lon = randCoordinate(MIN_LON,MAX_LON);
- int nr_char = (int)randCoordinate(5,100000);
- place->data = calloc(nr_char, 1);
- for(int i = 0; i < (nr_char-1); i++) {
- place->data[i] = i%10 + '0';
- }
- place->data[nr_char-1] = '\0';
- if(publish_svc->send) {
- if(publish_svc->send(publish_svc->handle,msgId,place)==0){
- printf("Sent %s [%f, %f] (%s, %s) data len = %d\n",st_struct->topic, place->position.lat, place->position.lon,place->name,place->description, nr_char);
- }
- } else {
- printf("No send for %s\n", st_struct->topic);
- }
+ while (stop == false) {
+ place->position.lat = randCoordinate(MIN_LAT,MAX_LAT);
+ place->position.lon = randCoordinate(MIN_LON,MAX_LON);
+ int nr_char = (int)randCoordinate(5,100000);
+ place->data = calloc(nr_char, 1);
+ for (int i = 0; i < (nr_char-1); i++) {
+ place->data[i] = i%10 + '0';
+ }
+ place->data[nr_char-1] = '\0';
+ if (publish_svc->send) {
+ if (publish_svc->send(publish_svc->handle,msgId,place) == 0) {
+ printf("Sent %s [%f, %f] (%s, %s) data len = %d\n",st_struct->topic, place->position.lat, place->position.lon,place->name,place->description, nr_char);
+ }
+ } else {
+ printf("No send for %s\n", st_struct->topic);
+ }
- free(place->data);
- sleep(2);
- }
- }
- else{
- printf("PUBLISHER: Cannot retrieve msgId for message '%s'\n",MSG_POI_NAME);
- }
+ free(place->data);
+ sleep(2);
+ }
+ } else {
+ printf("PUBLISHER: Cannot retrieve msgId for message '%s'\n",MSG_POI_NAME);
+ }
- free(place->description);
- free(place->name);
- free(place);
+ free(place->description);
+ free(place->name);
+ free(place);
- free(st_struct);
+ free(st_struct);
- return NULL;
+ return NULL;
}
pubsub_sender_pt publisher_create(array_list_pt trackers,const char* ident,long bundleId) {
- pubsub_sender_pt publisher = malloc(sizeof(*publisher));
+ pubsub_sender_pt publisher = malloc(sizeof(*publisher));
- publisher->trackers = trackers;
- publisher->ident = ident;
- publisher->bundleId = bundleId;
- publisher->tid_map = hashMap_create(NULL, NULL, NULL, NULL);
+ publisher->trackers = trackers;
+ publisher->ident = ident;
+ publisher->bundleId = bundleId;
+ publisher->tid_map = hashMap_create(NULL, NULL, NULL, NULL);
- return publisher;
+ return publisher;
}
void publisher_start(pubsub_sender_pt client) {
- printf("PUBLISHER: starting up...\n");
+ printf("PUBLISHER: starting up...\n");
}
void publisher_stop(pubsub_sender_pt client) {
- printf("PUBLISHER: stopping...\n");
+ printf("PUBLISHER: stopping...\n");
}
void publisher_destroy(pubsub_sender_pt client) {
- hashMap_destroy(client->tid_map, false, false);
- client->trackers = NULL;
- client->ident = NULL;
- free(client);
+ hashMap_destroy(client->tid_map, false, false);
+ client->trackers = NULL;
+ client->ident = NULL;
+ free(client);
}
void publisher_publishSvcAdded(void * handle, void *svc, const celix_properties_t *props) {
- pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)svc;
- pubsub_sender_pt manager = (pubsub_sender_pt)handle;
-
- printf("PUBLISHER: new publish service exported (%s).\n",manager->ident);
-
- send_thread_struct_pt data = calloc(1,sizeof(struct send_thread_struct));
- data->service = publish_svc;
- data->publisher = manager;
- data->topic = celix_properties_get(props, PUBSUB_PUBLISHER_TOPIC, "!ERROR!");
- celix_thread_t *tid = malloc(sizeof(*tid));
- stop=false;
- celixThread_create(tid,NULL,send_thread,(void*)data);
- hashMap_put(manager->tid_map, publish_svc, tid);
+ pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)svc;
+ pubsub_sender_pt manager = (pubsub_sender_pt)handle;
+
+ printf("PUBLISHER: new publish service exported (%s).\n",manager->ident);
+
+ send_thread_struct_pt data = calloc(1,sizeof(struct send_thread_struct));
+ data->service = publish_svc;
+ data->publisher = manager;
+ data->topic = celix_properties_get(props, PUBSUB_PUBLISHER_TOPIC, "!ERROR!");
+ celix_thread_t *tid = malloc(sizeof(*tid));
+ stop=false;
+ celixThread_create(tid,NULL,send_thread,(void*)data);
+ hashMap_put(manager->tid_map, publish_svc, tid);
}
void publisher_publishSvcRemoved(void * handle, void *svc, const celix_properties_t *props) {
- pubsub_sender_pt manager = (pubsub_sender_pt)handle;
- celix_thread_t *tid = hashMap_get(manager->tid_map, svc);
+ pubsub_sender_pt manager = (pubsub_sender_pt)handle;
+ celix_thread_t *tid = hashMap_get(manager->tid_map, svc);
#if defined(__APPLE__) && defined(__MACH__)
- uint64_t threadid;
- pthread_threadid_np(tid->thread, &threadid);
- printf("PUBLISHER: publish service unexporting (%s) %llu!\n",manager->ident, threadid);
+ uint64_t threadid;
+ pthread_threadid_np(tid->thread, &threadid);
+ printf("PUBLISHER: publish service unexporting (%s) %llu!\n",manager->ident, threadid);
#else
- printf("PUBLISHER: publish service unexporting (%s) %li!\n",manager->ident, tid->thread);
+ printf("PUBLISHER: publish service unexporting (%s) %li!\n",manager->ident, tid->thread);
#endif
- stop=true;
- celixThread_join(*tid,NULL);
- free(tid);
+ stop=true;
+ celixThread_join(*tid,NULL);
+ free(tid);
}
diff --git a/bundles/pubsub/examples/pubsub/publisher2/CMakeLists.txt b/bundles/pubsub/examples/pubsub/publisher2/CMakeLists.txt
index e59d793..a29968b 100644
--- a/bundles/pubsub/examples/pubsub/publisher2/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/publisher2/CMakeLists.txt
@@ -19,32 +19,32 @@ add_celix_bundle(celix_pubsub_poi_publisher2
SYMBOLIC_NAME "apache_celix_pubsub_poi_publisher2"
VERSION "1.0.0"
SOURCES
- ../publisher/private/src/ps_pub_activator.c
- ../publisher/private/src/pubsub_publisher.c
+ ../publisher/private/src/ps_pub_activator.c
+ ../publisher/private/src/pubsub_publisher.c
)
target_link_libraries(celix_pubsub_poi_publisher2 PRIVATE Celix::framework Celix::pubsub_api)
target_include_directories(celix_pubsub_poi_publisher2 PRIVATE ../publisher/private/include)
celix_bundle_files(celix_pubsub_poi_publisher2
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
+ ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
+ ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
DESTINATION "META-INF/descriptors"
)
celix_bundle_files(celix_pubsub_poi_publisher2
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poi1.properties
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poi2.properties
+ ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poi1.properties
+ ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poi2.properties
DESTINATION "META-INF/topics/pub"
)
celix_bundle_files(celix_pubsub_poi_publisher2
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/publisher
+ ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/publisher
DESTINATION "META-INF/keys"
)
celix_bundle_files(celix_pubsub_poi_publisher2
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/subscriber/public
+ ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/subscriber/public
DESTINATION "META-INF/keys/subscriber"
)
diff --git a/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt b/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt
index 05afa04..94acdb2 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt
@@ -19,8 +19,8 @@ add_celix_bundle(celix_pubsub_poi_subscriber
SYMBOLIC_NAME "apache_celix_pubsub_poi_subscriber"
VERSION "1.0.0"
SOURCES
- private/src/ps_sub_activator.c
- private/src/pubsub_subscriber.c
+ private/src/ps_sub_activator.c
+ private/src/pubsub_subscriber.c
)
target_link_libraries(celix_pubsub_poi_subscriber PRIVATE Celix::framework Celix::pubsub_api)
@@ -28,23 +28,23 @@ target_include_directories(celix_pubsub_poi_subscriber PRIVATE private/include)
celix_bundle_files(celix_pubsub_poi_subscriber
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
+ ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
+ ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
DESTINATION "META-INF/descriptors"
)
celix_bundle_files(celix_pubsub_poi_subscriber
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poi1.properties
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poi2.properties
+ ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poi1.properties
+ ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poi2.properties
DESTINATION "META-INF/topics/sub"
)
celix_bundle_files(celix_pubsub_poi_subscriber
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/subscriber
+ ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/subscriber
DESTINATION "META-INF/keys"
)
celix_bundle_files(celix_pubsub_poi_subscriber
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/publisher/public
+ ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/publisher/public
DESTINATION "META-INF/keys/publisher"
)
diff --git a/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h b/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
index 6e14fa3..2aa93a6 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
+++ b/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
@@ -20,8 +20,8 @@
* pubsub_subscriber_private.h
*
* \date Sep 21, 2010
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
*/
#ifndef PUBSUB_SUBSCRIBER_PRIVATE_H_
@@ -35,7 +35,7 @@
#include "pubsub/subscriber.h"
struct pubsub_receiver {
- char * name;
+ char *name;
};
typedef struct pubsub_receiver* pubsub_receiver_pt;
diff --git a/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c b/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
index 7b7ac1d..2303fea 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
+++ b/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
@@ -20,8 +20,8 @@
* pubsub_subscriber.c
*
* \date Sep 21, 2010
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
*/
#include <stdlib.h>
@@ -31,33 +31,33 @@
#include "pubsub_subscriber_private.h"
pubsub_receiver_pt subscriber_create(char* topics) {
- pubsub_receiver_pt sub = calloc(1,sizeof(*sub));
- sub->name = strdup(topics);
- return sub;
+ pubsub_receiver_pt sub = calloc(1,sizeof(*sub));
+ sub->name = strdup(topics);
+ return sub;
}
-void subscriber_start(pubsub_receiver_pt subscriber){
- printf("Subscriber started...\n");
+void subscriber_start(pubsub_receiver_pt subscriber) {
+ printf("Subscriber started...\n");
}
-void subscriber_stop(pubsub_receiver_pt subscriber){
- printf("Subscriber stopped...\n");
+void subscriber_stop(pubsub_receiver_pt subscriber) {
+ printf("Subscriber stopped...\n");
}
-void subscriber_destroy(pubsub_receiver_pt subscriber){
- if(subscriber->name!=NULL){
- free(subscriber->name);
- }
- subscriber->name=NULL;
- free(subscriber);
+void subscriber_destroy(pubsub_receiver_pt subscriber) {
+ if (subscriber->name != NULL) {
+ free(subscriber->name);
+ }
+ subscriber->name=NULL;
+ free(subscriber);
}
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, bool* release){
+int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, bool* release) {
- location_t place = (location_t)msg;
- printf("Recv (%s): [%f, %f] (%s, %s, %s, len data %li)\n", msgType, place->position.lat, place->position.lon, place->name, place->description, place->extra, (long)(strlen(place->data) + 1));
+ location_t place = (location_t)msg;
+ printf("Recv (%s): [%f, %f] (%s, %s, %s, len data %li)\n", msgType, place->position.lat, place->position.lon, place->name, place->description, place->extra, (long)(strlen(place->data) + 1));
- return 0;
+ return 0;
}
diff --git a/bundles/pubsub/keygen/CMakeLists.txt b/bundles/pubsub/keygen/CMakeLists.txt
index bc42173..991648e 100644
--- a/bundles/pubsub/keygen/CMakeLists.txt
+++ b/bundles/pubsub/keygen/CMakeLists.txt
@@ -17,18 +17,18 @@
if (BUILD_ZMQ_SECURITY)
- find_package(ZMQ REQUIRED)
- find_package(CZMQ REQUIRED)
- find_package(OpenSSL 1.1.0 REQUIRED)
-
- include_directories("${ZMQ_INCLUDE_DIR}")
- include_directories("${CZMQ_INCLUDE_DIR}")
- include_directories("${OPENSSL_INCLUDE_DIR}")
-
- add_executable(makecert makecert.c)
- target_link_libraries(makecert ${CZMQ_LIBRARIES})
-
- add_executable(ed_file ed_file.c)
- target_link_libraries(ed_file ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY})
+ find_package(ZMQ REQUIRED)
+ find_package(CZMQ REQUIRED)
+ find_package(OpenSSL 1.1.0 REQUIRED)
+
+ include_directories("${ZMQ_INCLUDE_DIR}")
+ include_directories("${CZMQ_INCLUDE_DIR}")
+ include_directories("${OPENSSL_INCLUDE_DIR}")
+
+ add_executable(makecert makecert.c)
+ target_link_libraries(makecert ${CZMQ_LIBRARIES})
+
+ add_executable(ed_file ed_file.c)
+ target_link_libraries(ed_file ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY})
endif()
diff --git a/bundles/pubsub/keygen/ed_file.c b/bundles/pubsub/keygen/ed_file.c
index a6aa125..4cf1907 100644
--- a/bundles/pubsub/keygen/ed_file.c
+++ b/bundles/pubsub/keygen/ed_file.c
@@ -20,8 +20,8 @@
* ed_file.c
*
* \date Dec 2, 2016
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
*/
#include <czmq.h>
@@ -49,261 +49,261 @@ static void parse_key_line(char *line, char **key, char **iv);
int main(int argc, const char* argv[])
{
- if (argc < 4){
- printf("Usage: %s <key_file> <input_file> <output_file> [options]\n", argv[0]);
- printf("Default behavior: encrypting a file\n");
- printf("Options:\n");
- printf("\t-d\tSpecify to decrypt a file\n");
- printf("\n");
- return EXIT_FAILURE;
- }
-
- int rc = 0;
-
- const char* keys_file_path = argv[1];
- const char* input_file_path = argv[2];
- const char* output_file_path = argv[3];
-
- bool decryptParam = false;
- if (argc > 4 && strcmp(argv[4], "-d") == 0){
- decryptParam = true;
- }
-
- if (!zsys_file_exists(keys_file_path)){
- printf("Keys file '%s' doesn't exist!\n", keys_file_path);
- return EXIT_FAILURE;
- }
-
- if (!zsys_file_exists(input_file_path)){
- printf("Input file does not exist!\n");
- return EXIT_FAILURE;
- }
-
- char* keys_data = read_keys_file_content(keys_file_path);
- if (keys_data == NULL){
- return EXIT_FAILURE;
- }
-
- char* key = NULL;
- char* iv = NULL;
- parse_key_lines(keys_data, &key, &iv);
- free(keys_data);
-
- if (key == NULL || iv == NULL){
- printf("Loading AES key and/or AES iv failed!\n");
- free(key);
- free(iv);
- return EXIT_FAILURE;
- }
-
- printf("Using AES Key \t\t'%s'\n", key);
- printf("Using AES IV \t\t'%s'\n", iv);
- printf("Input file path \t'%s'\n", input_file_path);
- printf("Output file path \t'%s'\n", output_file_path);
- printf("Decrypting \t\t'%s'\n\n", (decryptParam) ? "true" : "false");
-
- unsigned char key_digest[EVP_MAX_MD_SIZE];
- unsigned char iv_digest[EVP_MAX_MD_SIZE];
- generate_sha256_hash((char*) key, key_digest);
- generate_sha256_hash((char*) iv, iv_digest);
-
- zchunk_t* input_chunk = zchunk_slurp (input_file_path, 0);
- if (input_chunk == NULL){
- printf("Input file not correct!\n");
- free(key);
- free(iv);
- return EXIT_FAILURE;
- }
-
- //Load input data from file
- int input_file_size = (int) zchunk_size (input_chunk);
- char* input_file_data = zchunk_strdup(input_chunk);
- zchunk_destroy (&input_chunk);
-
- int output_len;
- unsigned char output[input_file_size];
- if (decryptParam){
- output_len = decrypt_aes((unsigned char*) input_file_data, input_file_size, key_digest, iv_digest, output);
- output[output_len] = '\0';
- }else{
- output_len = encrypt_aes((unsigned char*) input_file_data, input_file_size, key_digest, iv_digest, output);
- }
-
- //Write output data to file
- zfile_t* output_file = zfile_new (".", output_file_path);
- zchunk_t* output_chunk = zchunk_new(output, output_len);
- rc = zfile_output (output_file);
- if (rc != 0){
- printf("Problem with opening file for writing!\n");
- zchunk_destroy (&output_chunk);
- zfile_close (output_file);
- zfile_destroy (&output_file);
- free(input_file_data);
- free(key);
- free(iv);
-
- return EXIT_FAILURE;
- }
-
- rc = zfile_write (output_file, output_chunk, 0);
- if (rc != 0){
- printf("Problem with writing output to file!\n");
- }
- printf("Output written to file:\n");
- if (decryptParam){
- printf("%s\n", output);
- }else{
- BIO_dump_fp (stdout, (const char *) output, output_len);
- }
-
- zchunk_destroy (&output_chunk);
- zfile_close (output_file);
- zfile_destroy (&output_file);
- free(input_file_data);
- free(key);
- free(iv);
-
- return EXIT_SUCCESS;
+ if (argc < 4) {
+ printf("Usage: %s <key_file> <input_file> <output_file> [options]\n", argv[0]);
+ printf("Default behavior: encrypting a file\n");
+ printf("Options:\n");
+ printf("\t-d\tSpecify to decrypt a file\n");
+ printf("\n");
+ return EXIT_FAILURE;
+ }
+
+ int rc = 0;
+
+ const char* keys_file_path = argv[1];
+ const char* input_file_path = argv[2];
+ const char* output_file_path = argv[3];
+
+ bool decryptParam = false;
+ if (argc > 4 && strcmp(argv[4], "-d") == 0) {
+ decryptParam = true;
+ }
+
+ if (!zsys_file_exists(keys_file_path)) {
+ printf("Keys file '%s' doesn't exist!\n", keys_file_path);
+ return EXIT_FAILURE;
+ }
+
+ if (!zsys_file_exists(input_file_path)) {
+ printf("Input file does not exist!\n");
+ return EXIT_FAILURE;
+ }
+
+ char* keys_data = read_keys_file_content(keys_file_path);
+ if (keys_data == NULL) {
+ return EXIT_FAILURE;
+ }
+
+ char* key = NULL;
+ char* iv = NULL;
+ parse_key_lines(keys_data, &key, &iv);
+ free(keys_data);
+
+ if (key == NULL || iv == NULL) {
+ printf("Loading AES key and/or AES iv failed!\n");
+ free(key);
+ free(iv);
+ return EXIT_FAILURE;
+ }
+
+ printf("Using AES Key \t\t'%s'\n", key);
+ printf("Using AES IV \t\t'%s'\n", iv);
+ printf("Input file path \t'%s'\n", input_file_path);
+ printf("Output file path \t'%s'\n", output_file_path);
+ printf("Decrypting \t\t'%s'\n\n", (decryptParam) ? "true" : "false");
+
+ unsigned char key_digest[EVP_MAX_MD_SIZE];
+ unsigned char iv_digest[EVP_MAX_MD_SIZE];
+ generate_sha256_hash((char*) key, key_digest);
+ generate_sha256_hash((char*) iv, iv_digest);
+
+ zchunk_t* input_chunk = zchunk_slurp (input_file_path, 0);
+ if (input_chunk == NULL) {
+ printf("Input file not correct!\n");
+ free(key);
+ free(iv);
+ return EXIT_FAILURE;
+ }
+
+ //Load input data from file
+ int input_file_size = (int) zchunk_size (input_chunk);
+ char* input_file_data = zchunk_strdup(input_chunk);
+ zchunk_destroy (&input_chunk);
+
+ int output_len;
+ unsigned char output[input_file_size];
+ if (decryptParam) {
+ output_len = decrypt_aes((unsigned char*) input_file_data, input_file_size, key_digest, iv_digest, output);
+ output[output_len] = '\0';
+ }else{
+ output_len = encrypt_aes((unsigned char*) input_file_data, input_file_size, key_digest, iv_digest, output);
+ }
+
+ //Write output data to file
+ zfile_t* output_file = zfile_new (".", output_file_path);
+ zchunk_t* output_chunk = zchunk_new(output, output_len);
+ rc = zfile_output (output_file);
+ if (rc != 0) {
+ printf("Problem with opening file for writing!\n");
+ zchunk_destroy (&output_chunk);
+ zfile_close (output_file);
+ zfile_destroy (&output_file);
+ free(input_file_data);
+ free(key);
+ free(iv);
+
+ return EXIT_FAILURE;
+ }
+
+ rc = zfile_write (output_file, output_chunk, 0);
+ if (rc != 0) {
+ printf("Problem with writing output to file!\n");
+ }
+ printf("Output written to file:\n");
+ if (decryptParam) {
+ printf("%s\n", output);
+ }else{
+ BIO_dump_fp (stdout, (const char *) output, output_len);
+ }
+
+ zchunk_destroy (&output_chunk);
+ zfile_close (output_file);
+ zfile_destroy (&output_file);
+ free(input_file_data);
+ free(key);
+ free(iv);
+
+ return EXIT_SUCCESS;
}
int generate_sha256_hash(char* text, unsigned char* digest)
{
- unsigned int digest_len;
+ unsigned int digest_len;
- EVP_MD_CTX * mdctx = EVP_MD_CTX_new();
- EVP_DigestInit_ex(mdctx, EVP_sha256(), NULL);
- EVP_DigestUpdate(mdctx, text, strlen(text));
- EVP_DigestFinal_ex(mdctx, digest, &digest_len);
- EVP_MD_CTX_free(mdctx);
+ EVP_MD_CTX * mdctx = EVP_MD_CTX_new();
+ EVP_DigestInit_ex(mdctx, EVP_sha256(), NULL);
+ EVP_DigestUpdate(mdctx, text, strlen(text));
+ EVP_DigestFinal_ex(mdctx, digest, &digest_len);
+ EVP_MD_CTX_free(mdctx);
- return digest_len;
+ return digest_len;
}
int encrypt_aes(unsigned char *plaintext, int plaintext_len, unsigned char *key, unsigned char *iv, unsigned char *ciphertext)
{
- int len;
- int ciphertext_len;
+ int len;
+ int ciphertext_len;
- EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new();
+ EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new();
- EVP_EncryptInit_ex(ctx, EVP_aes_256_cbc(), NULL, key, iv);
- EVP_EncryptUpdate(ctx, ciphertext, &len, plaintext, plaintext_len);
- ciphertext_len = len;
- EVP_EncryptFinal_ex(ctx, ciphertext + len, &len);
- ciphertext_len += len;
+ EVP_EncryptInit_ex(ctx, EVP_aes_256_cbc(), NULL, key, iv);
+ EVP_EncryptUpdate(ctx, ciphertext, &len, plaintext, plaintext_len);
+ ciphertext_len = len;
+ EVP_EncryptFinal_ex(ctx, ciphertext + len, &len);
+ ciphertext_len += len;
- EVP_CIPHER_CTX_free(ctx);
+ EVP_CIPHER_CTX_free(ctx);
- return ciphertext_len;
+ return ciphertext_len;
}
int decrypt_aes(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext)
{
- int len;
- int plaintext_len;
+ int len;
+ int plaintext_len;
- EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new();
+ EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new();
- EVP_DecryptInit_ex(ctx, EVP_aes_256_cbc(), NULL, key, iv);
- EVP_DecryptUpdate(ctx, plaintext, &len, ciphertext, ciphertext_len);
- plaintext_len = len;
- EVP_DecryptFinal_ex(ctx, plaintext + len, &len);
- plaintext_len += len;
+ EVP_DecryptInit_ex(ctx, EVP_aes_256_cbc(), NULL, key, iv);
+ EVP_DecryptUpdate(ctx, plaintext, &len, ciphertext, ciphertext_len);
+ plaintext_len = len;
+ EVP_DecryptFinal_ex(ctx, plaintext + len, &len);
+ plaintext_len += len;
- EVP_CIPHER_CTX_free(ctx);
+ EVP_CIPHER_CTX_free(ctx);
- return plaintext_len;
+ return plaintext_len;
}
-static char* read_keys_file_content(const char *keys_file_path){
- char* keys_file_full_path = strndup(keys_file_path, MAX_KEY_FILE_LENGTH);
- char* keys_file_name = NULL;
-
- char* sep_kf_at = strrchr(keys_file_path, '/');
- if (sep_kf_at != NULL){
- *sep_kf_at = '\0';
- keys_file_name = sep_kf_at + 1;
- }else{
- keys_file_name = (char*) keys_file_path;
- keys_file_path = (const char*) ".";
- }
-
- printf("Keys file path: %s\n", keys_file_full_path);
-
- int rc = 0;
-
- zfile_t* keys_file = zfile_new (keys_file_path, keys_file_name);
- rc = zfile_input (keys_file);
- if (rc != 0){
- printf("Keys file '%s' not readable!\n", keys_file_full_path);
- zfile_destroy(&keys_file);
- free(keys_file_full_path);
- return NULL;
- }
-
- ssize_t keys_file_size = zsys_file_size (keys_file_full_path);
- zchunk_t* keys_chunk = zfile_read (keys_file, keys_file_size, 0);
- if (keys_chunk == NULL){
- printf("Can't read file '%s'!\n", keys_file_full_path);
- zfile_close(keys_file);
- zfile_destroy(&keys_file);
- free(keys_file_full_path);
- return NULL;
- }
-
- char* keys_data = zchunk_strdup(keys_chunk);
- zchunk_destroy(&keys_chunk);
- zfile_close(keys_file);
- zfile_destroy (&keys_file);
-
- return keys_data;
+static char* read_keys_file_content(const char *keys_file_path) {
+ char* keys_file_full_path = strndup(keys_file_path, MAX_KEY_FILE_LENGTH);
+ char* keys_file_name = NULL;
+
+ char* sep_kf_at = strrchr(keys_file_path, '/');
+ if (sep_kf_at != NULL) {
+ *sep_kf_at = '\0';
+ keys_file_name = sep_kf_at + 1;
+ }else{
+ keys_file_name = (char*) keys_file_path;
+ keys_file_path = (const char*) ".";
+ }
+
+ printf("Keys file path: %s\n", keys_file_full_path);
+
+ int rc = 0;
+
+ zfile_t* keys_file = zfile_new (keys_file_path, keys_file_name);
+ rc = zfile_input (keys_file);
+ if (rc != 0) {
+ printf("Keys file '%s' not readable!\n", keys_file_full_path);
+ zfile_destroy(&keys_file);
+ free(keys_file_full_path);
+ return NULL;
+ }
+
+ ssize_t keys_file_size = zsys_file_size (keys_file_full_path);
+ zchunk_t* keys_chunk = zfile_read (keys_file, keys_file_size, 0);
+ if (keys_chunk == NULL) {
+ printf("Can't read file '%s'!\n", keys_file_full_path);
+ zfile_close(keys_file);
+ zfile_destroy(&keys_file);
+ free(keys_file_full_path);
+ return NULL;
+ }
+
+ char* keys_data = zchunk_strdup(keys_chunk);
+ zchunk_destroy(&keys_chunk);
+ zfile_close(keys_file);
+ zfile_destroy (&keys_file);
+
+ return keys_data;
}
-static void parse_key_lines(char *keysBuffer, char **key, char **iv){
- char *line = NULL, *saveLinePointer = NULL;
+static void parse_key_lines(char *keysBuffer, char **key, char **iv) {
+ char *line = NULL, *saveLinePointer = NULL;
- bool firstTime = true;
- do {
- if (firstTime){
- line = strtok_r(keysBuffer, "\n", &saveLinePointer);
- firstTime = false;
- }else {
- line = strtok_r(NULL, "\n", &saveLinePointer);
- }
+ bool firstTime = true;
+ do {
+ if (firstTime) {
+ line = strtok_r(keysBuffer, "\n", &saveLinePointer);
+ firstTime = false;
+ }else {
+ line = strtok_r(NULL, "\n", &saveLinePointer);
+ }
- if (line == NULL){
- break;
- }
+ if (line == NULL) {
+ break;
+ }
- parse_key_line(line, key, iv);
+ parse_key_line(line, key, iv);
- } while((*key == NULL || *iv == NULL) && line != NULL);
+ } while ((*key == NULL || *iv == NULL) && line != NULL);
}
-static void parse_key_line(char *line, char **key, char **iv){
- char *detectedKey = NULL, *detectedValue= NULL;
-
- char* sep_at = strchr(line, ':');
- if (sep_at == NULL){
- return;
- }
-
- *sep_at = '\0'; // overwrite first separator, creating two strings.
- detectedKey = line;
- detectedValue = sep_at + 1;
-
- if (detectedKey == NULL || detectedValue == NULL){
- return;
- }
- if (detectedKey[0] == '\0' || detectedValue[0] == '\0'){
- return;
- }
-
- if (*key == NULL && strcmp(detectedKey, KEY_TO_GET) == 0){
- *key = strndup(detectedValue, AES_KEY_LENGTH);
- } else if (*iv == NULL && strcmp(detectedKey, IV_TO_GET) == 0){
- *iv = strndup(detectedValue, AES_IV_LENGTH);
- }
+static void parse_key_line(char *line, char **key, char **iv) {
+ char *detectedKey = NULL, *detectedValue= NULL;
+
+ char* sep_at = strchr(line, ':');
+ if (sep_at == NULL) {
+ return;
+ }
+
+ *sep_at = '\0'; // overwrite first separator, creating two strings.
+ detectedKey = line;
+ detectedValue = sep_at + 1;
+
+ if (detectedKey == NULL || detectedValue == NULL) {
+ return;
+ }
+ if (detectedKey[0] == '\0' || detectedValue[0] == '\0') {
+ return;
+ }
+
+ if (*key == NULL && strcmp(detectedKey, KEY_TO_GET) == 0) {
+ *key = strndup(detectedValue, AES_KEY_LENGTH);
+ } else if (*iv == NULL && strcmp(detectedKey, IV_TO_GET) == 0) {
+ *iv = strndup(detectedValue, AES_IV_LENGTH);
+ }
}
diff --git a/bundles/pubsub/keygen/makecert.c b/bundles/pubsub/keygen/makecert.c
index 7552631..75ba5b5 100644
--- a/bundles/pubsub/keygen/makecert.c
+++ b/bundles/pubsub/keygen/makecert.c
@@ -20,36 +20,35 @@
* makecert.c
*
* \date Dec 2, 2016
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
*/
#include <string.h>
#include "czmq.h"
-int main (int argc, const char * argv[])
-{
+int main (int argc, const char * argv[]) {
- const char * cert_name_public = "certificate.pub";
- const char * cert_name_secret = "certificate.key";
- if (argc == 3 && strcmp(argv[1], argv[2]) != 0){
- cert_name_public = argv[1];
- cert_name_secret = argv[2];
- }
+ const char * cert_name_public = "certificate.pub";
+ const char * cert_name_secret = "certificate.key";
+ if (argc == 3 && strcmp(argv[1], argv[2]) != 0) {
+ cert_name_public = argv[1];
+ cert_name_secret = argv[2];
+ }
- zcert_t * cert = zcert_new();
+ zcert_t * cert = zcert_new();
- char *timestr = zclock_timestr ();
- zcert_set_meta (cert, "date-created", timestr);
- free (timestr);
+ char *timestr = zclock_timestr ();
+ zcert_set_meta (cert, "date-created", timestr);
+ free (timestr);
- zcert_save_public(cert, cert_name_public);
- zcert_save_secret(cert, cert_name_secret);
- zcert_print (cert);
- printf("\n");
- printf("I: CURVE certificate created in %s and %s\n", cert_name_public, cert_name_secret);
- zcert_destroy (&cert);
+ zcert_save_public(cert, cert_name_public);
+ zcert_save_secret(cert, cert_name_secret);
+ zcert_print (cert);
+ printf("\n");
+ printf("I: CURVE certificate created in %s and %s\n", cert_name_public, cert_name_secret);
+ zcert_destroy (&cert);
- return 0;
+ return 0;
}
diff --git a/bundles/pubsub/mock/tst/pubsubmock_test.cc b/bundles/pubsub/mock/tst/pubsubmock_test.cc
index 116d877..f546bf3 100644
--- a/bundles/pubsub/mock/tst/pubsubmock_test.cc
+++ b/bundles/pubsub/mock/tst/pubsubmock_test.cc
@@ -35,8 +35,8 @@ static void* mockHandle = (void*)0x42;
TEST_GROUP(pubsubmock) {
void setup(void) {
- //setup mock
- pubsub_publisherMock_init(&mockSrv, mockHandle);
+ //setup mock
+ pubsub_publisherMock_init(&mockSrv, mockHandle);
}
void teardown() {
@@ -51,8 +51,8 @@ TEST(pubsubmock, publishermock) {
mock(PUBSUB_PUBLISHERMOCK_SCOPE).expectOneCall(PUBSUB_PUBLISHERMOCK_LOCAL_MSG_TYPE_ID_FOR_MSG_TYPE_METHOD)
.withParameter("handle", mockHandle)
- .withParameter("msgType", mockFqn)
- .withOutputParameterReturning("msgTypeId", &mockOutputTypeId, sizeof(mockOutputTypeId));
+ .withParameter("msgType", mockFqn)
+ .withOutputParameterReturning("msgTypeId", &mockOutputTypeId, sizeof(mockOutputTypeId));
mock(PUBSUB_PUBLISHERMOCK_SCOPE).expectOneCall(PUBSUB_PUBLISHERMOCK_SEND_METHOD)
.withParameter("handle", mockHandle)
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/CMakeLists.txt b/bundles/pubsub/pubsub_admin_nanomsg/CMakeLists.txt
index ab9806e..86a5000 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_nanomsg/CMakeLists.txt
@@ -17,35 +17,35 @@
if (BUILD_PUBSUB_PSA_NANOMSG)
- find_package(NanoMsg REQUIRED)
- find_package(Jansson REQUIRED)
+ find_package(NanoMsg REQUIRED)
+ find_package(Jansson REQUIRED)
- add_celix_bundle(celix_pubsub_admin_nanomsg
- BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_nanomsg"
- VERSION "1.0.0"
- GROUP "Celix/PubSub"
- SOURCES
- src/psa_nanomsg_activator.cc
- src/pubsub_nanomsg_admin.cc
- src/pubsub_nanomsg_topic_sender.cc
- src/pubsub_nanomsg_topic_receiver.cc
- src/pubsub_nanomsg_common.cc
- )
+ add_celix_bundle(celix_pubsub_admin_nanomsg
+ BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_nanomsg"
+ VERSION "1.0.0"
+ GROUP "Celix/PubSub"
+ SOURCES
+ src/psa_nanomsg_activator.cc
+ src/pubsub_nanomsg_admin.cc
+ src/pubsub_nanomsg_topic_sender.cc
+ src/pubsub_nanomsg_topic_receiver.cc
+ src/pubsub_nanomsg_common.cc
+ )
- set_target_properties(celix_pubsub_admin_nanomsg PROPERTIES INSTALL_RPATH "$ORIGIN")
- target_link_libraries(celix_pubsub_admin_nanomsg PRIVATE
- Celix::pubsub_spi
- Celix::framework Celix::dfi Celix::log_helper
- ${NANOMSG_LIBRARIES}
- )
- target_include_directories(celix_pubsub_admin_nanomsg PRIVATE
- ${NANOMSG_INCLUDE_DIR}
- ${JANSSON_INCLUDE_DIR}
- src
- ../pubsub_topology_manager/src
- )
+ set_target_properties(celix_pubsub_admin_nanomsg PROPERTIES INSTALL_RPATH "$ORIGIN")
+ target_link_libraries(celix_pubsub_admin_nanomsg PRIVATE
+ Celix::pubsub_spi
+ Celix::framework Celix::dfi Celix::log_helper
+ ${NANOMSG_LIBRARIES}
+ )
+ target_include_directories(celix_pubsub_admin_nanomsg PRIVATE
+ ${NANOMSG_INCLUDE_DIR}
+ ${JANSSON_INCLUDE_DIR}
+ src
+ ../pubsub_topology_manager/src
+ )
- install_celix_bundle(celix_pubsub_admin_nanomsg EXPORT celix COMPONENT pubsub)
- target_link_libraries(celix_pubsub_admin_nanomsg PRIVATE Celix::shell_api)
- add_library(Celix::pubsub_admin_nanomsg ALIAS celix_pubsub_admin_nanomsg)
+ install_celix_bundle(celix_pubsub_admin_nanomsg EXPORT celix COMPONENT pubsub)
+ target_link_libraries(celix_pubsub_admin_nanomsg PRIVATE Celix::shell_api)
+ add_library(Celix::pubsub_admin_nanomsg ALIAS celix_pubsub_admin_nanomsg)
endif()
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index 7e34838..27b264d 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -236,7 +236,7 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper
auto &entry = kvsm->second;
{
std::lock_guard<std::mutex> senderLock(topicSenders.mutex);
- for(auto it = topicSenders.map.begin(); it != topicSenders.map.end(); /*nothing*/) {
+ for (auto it = topicSenders.map.begin(); it != topicSenders.map.end(); /*nothing*/) {
auto &sender = it->second;
if (entry.svcId == sender.getSerializerSvcId()) {
it = topicSenders.map.erase(it);
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index 0a1615a..7fd35a6 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -38,7 +38,7 @@
#define PUBSUB_NANOMSG_VERBOSE_DEFAULT true
#define PUBSUB_NANOMSG_PSA_IP_KEY "PSA_IP"
-#define PUBSUB_NANOMSG_PSA_ITF_KEY "PSA_INTERFACE"
+#define PUBSUB_NANOMSG_PSA_ITF_KEY "PSA_INTERFACE"
#define PUBSUB_NANOMSG_DEFAULT_IP "127.0.0.1"
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
index 7e97bab..b791ee2 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
@@ -33,7 +33,7 @@ bool celix::pubsub::nanomsg::checkVersion(version_pt msgVersion, const celix::pu
if (msgVersion!=NULL) {
version_getMajor(msgVersion,&major);
version_getMinor(msgVersion,&minor);
- if(hdr->major==((unsigned char)major)){ /* Different major means incompatible */
+ if (hdr->major==((unsigned char)major)) { /* Different major means incompatible */
check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
}
}
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index 031d1ba..dabf72e 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -124,7 +124,7 @@ pubsub::nanomsg::topic_receiver::~topic_receiver() {
{
std::lock_guard<std::mutex> _lock(subscribers.mutex);
- for(auto elem : subscribers.map) {
+ for (auto elem : subscribers.map) {
serializer->destroySerializerMap(serializer->handle, elem.second.msgTypes);
}
subscribers.map.clear();
@@ -257,7 +257,7 @@ void pubsub::nanomsg::topic_receiver::processMsgForSubscriberEntry(psa_nanomsg_s
bool validVersion = celix::pubsub::nanomsg::checkVersion(msgSer->msgVersion, hdr);
if (validVersion) {
celix_status_t status = msgSer->deserialize(msgSer, payload, payloadSize, &deserializedMsg);
- if(status == CELIX_SUCCESS) {
+ if (status == CELIX_SUCCESS) {
bool release = false;
svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, &release);
if (release) {
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
index 10920bf..378199c 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
@@ -64,7 +64,7 @@ pubsub::nanomsg::pubsub_nanomsg_topic_sender::pubsub_nanomsg_topic_sender(celix_
}
int rv = -1, retry=0;
- while(rv == -1 && retry < NANOMSG_BIND_MAX_RETRY ) {
+ while (rv == -1 && retry < NANOMSG_BIND_MAX_RETRY ) {
/* Randomized part due to same bundle publishing on different topics */
unsigned int port = rand_range(_basePort,_maxPort);
std::stringstream _url;
@@ -252,14 +252,14 @@ static void delay_first_send_for_late_joiners(celix::pubsub::nanomsg::LogHelper&
static bool firstSend = true;
- if(firstSend){
+ if (firstSend) {
logHelper.INFO("PSA_UDP_MC_TP: Delaying first send for late joiners...\n");
sleep(FIRST_SEND_DELAY_IN_SECONDS);
firstSend = false;
}
}
-static unsigned int rand_range(unsigned int min, unsigned int max){
+static unsigned int rand_range(unsigned int min, unsigned int max) {
double scaled = ((double)random())/((double)RAND_MAX);
return (unsigned int)((max-min+1)*scaled + min);
}
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_psa_nanomsg_constants.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_psa_nanomsg_constants.h
index 9b8882b..5632e1d 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_psa_nanomsg_constants.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_psa_nanomsg_constants.h
@@ -30,13 +30,13 @@
#define PSA_NANOMSG_DEFAULT_BASE_PORT 5501
#define PSA_NANOMSG_DEFAULT_MAX_PORT 6000
-#define PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE 30
-#define PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE 70
-#define PSA_NANOMSG_DEFAULT_SCORE 30
+#define PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE 30
+#define PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE 70
+#define PSA_NANOMSG_DEFAULT_SCORE 30
-#define PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY "PSA_NANOMSG_QOS_SAMPLE_SCORE"
-#define PSA_NANOMSG_QOS_CONTROL_SCORE_KEY "PSA_NANOMSG_QOS_CONTROL_SCORE"
-#define PSA_NANOMSG_DEFAULT_SCORE_KEY "PSA_NANOMSG_DEFAULT_SCORE"
+#define PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY "PSA_NANOMSG_QOS_SAMPLE_SCORE"
+#define PSA_NANOMSG_QOS_CONTROL_SCORE_KEY "PSA_NANOMSG_QOS_CONTROL_SCORE"
+#define PSA_NANOMSG_DEFAULT_SCORE_KEY "PSA_NANOMSG_DEFAULT_SCORE"
#endif /* PUBSUB_PSA_NANOMSG_CONSTANTS_H_ */
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt b/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
index e5ed16f..8b87422 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
@@ -18,20 +18,20 @@
find_package(Jansson REQUIRED)
add_celix_bundle(celix_pubsub_admin_udp_multicast
- BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_udp_multicast"
- VERSION "1.0.0"
- GROUP "Celix/PubSub"
- SOURCES
+ BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_udp_multicast"
+ VERSION "1.0.0"
+ GROUP "Celix/PubSub"
+ SOURCES
src/psa_activator.c
- src/pubsub_udpmc_admin.c
- src/pubsub_udpmc_topic_sender.c
- src/pubsub_udpmc_topic_receiver.c
- src/pubsub_udpmc_common.c
+ src/pubsub_udpmc_admin.c
+ src/pubsub_udpmc_topic_sender.c
+ src/pubsub_udpmc_topic_receiver.c
+ src/pubsub_udpmc_common.c
src/large_udp.c
)
target_include_directories(celix_pubsub_admin_udp_multicast PRIVATE
- src
- ${JANSSON_INCLUDE_DIR}
+ src
+ ${JANSSON_INCLUDE_DIR}
)
set_target_properties(celix_pubsub_admin_udp_multicast PROPERTIES INSTALL_RPATH "$ORIGIN")
target_link_libraries(celix_pubsub_admin_udp_multicast PRIVATE Celix::pubsub_spi Celix::framework Celix::dfi Celix::log_helper Celix::utils)
diff --git a/bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt b/bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt
index e2d346b..bcf6199 100644
--- a/bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt
@@ -17,54 +17,54 @@
if (BUILD_PUBSUB_PSA_ZMQ)
- find_package(ZMQ REQUIRED)
- find_package(CZMQ REQUIRED)
- find_package(Jansson REQUIRED)
- find_package(UUID REQUIRED)
+ find_package(ZMQ REQUIRED)
+ find_package(CZMQ REQUIRED)
+ find_package(Jansson REQUIRED)
+ find_package(UUID REQUIRED)
- if(NOT UUID_LIBRARY)
- #i.e. not found for OSX
- set(UUID_LIBRARY "")
- set(UUID_INCLUDE_DIRS "")
- endif()
+ if(NOT UUID_LIBRARY)
+ #i.e. not found for OSX
+ set(UUID_LIBRARY "")
+ set(UUID_INCLUDE_DIRS "")
+ endif()
- if (BUILD_ZMQ_SECURITY)
- add_definitions(-DBUILD_WITH_ZMQ_SECURITY=1)
+ if (BUILD_ZMQ_SECURITY)
+ add_definitions(-DBUILD_WITH_ZMQ_SECURITY=1)
- find_package(OpenSSL 1.1.0 REQUIRED)
- include_directories("${OPENSSL_INCLUDE_DIR}")
+ find_package(OpenSSL 1.1.0 REQUIRED)
+ include_directories("${OPENSSL_INCLUDE_DIR}")
- set (ZMQ_CRYPTO_C "src/zmq_crypto.c")
- endif()
+ set (ZMQ_CRYPTO_C "src/zmq_crypto.c")
+ endif()
- add_celix_bundle(celix_pubsub_admin_zmq
- BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_zmq"
- VERSION "1.0.0"
- GROUP "Celix/PubSub"
- SOURCES
- src/psa_activator.c
- src/pubsub_zmq_admin.c
- src/pubsub_zmq_topic_sender.c
- src/pubsub_zmq_topic_receiver.c
- src/pubsub_zmq_common.c
- ${ZMQ_CRYPTO_C}
- )
+ add_celix_bundle(celix_pubsub_admin_zmq
+ BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_zmq"
+ VERSION "1.0.0"
+ GROUP "Celix/PubSub"
+ SOURCES
+ src/psa_activator.c
+ src/pubsub_zmq_admin.c
+ src/pubsub_zmq_topic_sender.c
+ src/pubsub_zmq_topic_receiver.c
+ src/pubsub_zmq_common.c
+ ${ZMQ_CRYPTO_C}
+ )
- set_target_properties(celix_pubsub_admin_zmq PROPERTIES INSTALL_RPATH "$ORIGIN")
- target_link_libraries(celix_pubsub_admin_zmq PRIVATE
- Celix::pubsub_spi
- Celix::framework Celix::dfi Celix::log_helper Celix::utils
- ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY}
- )
- target_include_directories(celix_pubsub_admin_zmq PRIVATE
- ${ZMQ_INCLUDE_DIR}
- ${CZMQ_INCLUDE_DIR}
- ${JANSSON_INCLUDE_DIR}
- ${UUID_INCLUDE_DIRS}
- src
- )
+ set_target_properties(celix_pubsub_admin_zmq PROPERTIES INSTALL_RPATH "$ORIGIN")
+ target_link_libraries(celix_pubsub_admin_zmq PRIVATE
+ Celix::pubsub_spi
+ Celix::framework Celix::dfi Celix::log_helper Celix::utils
+ ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY}
+ )
+ target_include_directories(celix_pubsub_admin_zmq PRIVATE
+ ${ZMQ_INCLUDE_DIR}
+ ${CZMQ_INCLUDE_DIR}
+ ${JANSSON_INCLUDE_DIR}
+ ${UUID_INCLUDE_DIRS}
+ src
+ )
- install_celix_bundle(celix_pubsub_admin_zmq EXPORT celix COMPONENT pubsub)
- target_link_libraries(celix_pubsub_admin_zmq PRIVATE Celix::shell_api)
- add_library(Celix::pubsub_admin_zmq ALIAS celix_pubsub_admin_zmq)
+ install_celix_bundle(celix_pubsub_admin_zmq EXPORT celix COMPONENT pubsub)
+ target_link_libraries(celix_pubsub_admin_zmq PRIVATE Celix::shell_api)
+ add_library(Celix::pubsub_admin_zmq ALIAS celix_pubsub_admin_zmq)
endif()
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
index 75b23a9..d1df6aa 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
@@ -29,99 +29,99 @@
#include "command.h"
typedef struct psa_zmq_activator {
- log_helper_t *logHelper;
+ log_helper_t *logHelper;
- pubsub_zmq_admin_t *admin;
+ pubsub_zmq_admin_t *admin;
- long serializersTrackerId;
+ long serializersTrackerId;
- pubsub_admin_service_t adminService;
- long adminSvcId;
+ pubsub_admin_service_t adminService;
+ long adminSvcId;
- pubsub_admin_metrics_service_t adminMetricsService;
- long adminMetricsSvcId;
+ pubsub_admin_metrics_service_t adminMetricsService;
+ long adminMetricsSvcId;
- command_service_t cmdSvc;
- long cmdSvcId;
+ command_service_t cmdSvc;
+ long cmdSvcId;
} psa_zmq_activator_t;
int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
- act->adminSvcId = -1L;
- act->cmdSvcId = -1L;
- act->serializersTrackerId = -1L;
-
- logHelper_create(ctx, &act->logHelper);
- logHelper_start(act->logHelper);
-
- act->admin = pubsub_zmqAdmin_create(ctx, act->logHelper);
- celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
-
- //track serializers
- if (status == CELIX_SUCCESS) {
- celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
- opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
- opts.filter.ignoreServiceLanguage = true;
- opts.callbackHandle = act->admin;
- opts.addWithProperties = pubsub_zmqAdmin_addSerializerSvc;
- opts.removeWithProperties = pubsub_zmqAdmin_removeSerializerSvc;
- act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- }
-
- //register pubsub admin service
- if (status == CELIX_SUCCESS) {
- pubsub_admin_service_t *psaSvc = &act->adminService;
- psaSvc->handle = act->admin;
- psaSvc->matchPublisher = pubsub_zmqAdmin_matchPublisher;
- psaSvc->matchSubscriber = pubsub_zmqAdmin_matchSubscriber;
- psaSvc->matchDiscoveredEndpoint = pubsub_zmqAdmin_matchDiscoveredEndpoint;
- psaSvc->setupTopicSender = pubsub_zmqAdmin_setupTopicSender;
- psaSvc->teardownTopicSender = pubsub_zmqAdmin_teardownTopicSender;
- psaSvc->setupTopicReceiver = pubsub_zmqAdmin_setupTopicReceiver;
- psaSvc->teardownTopicReceiver = pubsub_zmqAdmin_teardownTopicReceiver;
- psaSvc->addDiscoveredEndpoint = pubsub_zmqAdmin_addDiscoveredEndpoint;
- psaSvc->removeDiscoveredEndpoint = pubsub_zmqAdmin_removeDiscoveredEndpoint;
-
- celix_properties_t *props = celix_properties_create();
- celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_ZMQ_ADMIN_TYPE);
-
- act->adminSvcId = celix_bundleContext_registerService(ctx, psaSvc, PUBSUB_ADMIN_SERVICE_NAME, props);
- }
-
- if (status == CELIX_SUCCESS) {
- act->adminMetricsService.handle = act->admin;
- act->adminMetricsService.metrics = pubsub_zmqAdmin_metrics;
-
- celix_properties_t *props = celix_properties_create();
- celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_ZMQ_ADMIN_TYPE);
-
- act->adminMetricsSvcId = celix_bundleContext_registerService(ctx, &act->adminMetricsService, PUBSUB_ADMIN_METRICS_SERVICE_NAME, props);
- }
-
- //register shell command service
- {
- act->cmdSvc.handle = act->admin;
- act->cmdSvc.executeCommand = pubsub_zmqAdmin_executeCommand;
- celix_properties_t *props = celix_properties_create();
- celix_properties_set(props, OSGI_SHELL_COMMAND_NAME, "psa_zmq");
- celix_properties_set(props, OSGI_SHELL_COMMAND_USAGE, "psa_zmq");
- celix_properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the ZMQ PSA");
- act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props);
- }
-
- return status;
+ act->adminSvcId = -1L;
+ act->cmdSvcId = -1L;
+ act->serializersTrackerId = -1L;
+
+ logHelper_create(ctx, &act->logHelper);
+ logHelper_start(act->logHelper);
+
+ act->admin = pubsub_zmqAdmin_create(ctx, act->logHelper);
+ celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
+
+ //track serializers
+ if (status == CELIX_SUCCESS) {
+ celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+ opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
+ opts.filter.ignoreServiceLanguage = true;
+ opts.callbackHandle = act->admin;
+ opts.addWithProperties = pubsub_zmqAdmin_addSerializerSvc;
+ opts.removeWithProperties = pubsub_zmqAdmin_removeSerializerSvc;
+ act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+ }
+
+ //register pubsub admin service
+ if (status == CELIX_SUCCESS) {
+ pubsub_admin_service_t *psaSvc = &act->adminService;
+ psaSvc->handle = act->admin;
+ psaSvc->matchPublisher = pubsub_zmqAdmin_matchPublisher;
+ psaSvc->matchSubscriber = pubsub_zmqAdmin_matchSubscriber;
+ psaSvc->matchDiscoveredEndpoint = pubsub_zmqAdmin_matchDiscoveredEndpoint;
+ psaSvc->setupTopicSender = pubsub_zmqAdmin_setupTopicSender;
+ psaSvc->teardownTopicSender = pubsub_zmqAdmin_teardownTopicSender;
+ psaSvc->setupTopicReceiver = pubsub_zmqAdmin_setupTopicReceiver;
+ psaSvc->teardownTopicReceiver = pubsub_zmqAdmin_teardownTopicReceiver;
+ psaSvc->addDiscoveredEndpoint = pubsub_zmqAdmin_addDiscoveredEndpoint;
+ psaSvc->removeDiscoveredEndpoint = pubsub_zmqAdmin_removeDiscoveredEndpoint;
+
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_ZMQ_ADMIN_TYPE);
+
+ act->adminSvcId = celix_bundleContext_registerService(ctx, psaSvc, PUBSUB_ADMIN_SERVICE_NAME, props);
+ }
+
+ if (status == CELIX_SUCCESS) {
+ act->adminMetricsService.handle = act->admin;
+ act->adminMetricsService.metrics = pubsub_zmqAdmin_metrics;
+
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_ZMQ_ADMIN_TYPE);
+
+ act->adminMetricsSvcId = celix_bundleContext_registerService(ctx, &act->adminMetricsService, PUBSUB_ADMIN_METRICS_SERVICE_NAME, props);
+ }
+
+ //register shell command service
+ {
+ act->cmdSvc.handle = act->admin;
+ act->cmdSvc.executeCommand = pubsub_zmqAdmin_executeCommand;
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, OSGI_SHELL_COMMAND_NAME, "psa_zmq");
+ celix_properties_set(props, OSGI_SHELL_COMMAND_USAGE, "psa_zmq");
+ celix_properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the ZMQ PSA");
+ act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props);
+ }
+
+ return status;
}
int psa_zmq_stop(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
- celix_bundleContext_unregisterService(ctx, act->adminSvcId);
- celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
- celix_bundleContext_unregisterService(ctx, act->adminMetricsSvcId);
- celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
- pubsub_zmqAdmin_destroy(act->admin);
+ celix_bundleContext_unregisterService(ctx, act->adminSvcId);
+ celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
+ celix_bundleContext_unregisterService(ctx, act->adminMetricsSvcId);
+ celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
+ pubsub_zmqAdmin_destroy(act->admin);
- logHelper_stop(act->logHelper);
- logHelper_destroy(&act->logHelper);
+ logHelper_stop(act->logHelper);
+ logHelper_destroy(&act->logHelper);
- return CELIX_SUCCESS;
+ return CELIX_SUCCESS;
}
CELIX_GEN_BUNDLE_ACTIVATOR(psa_zmq_activator_t, psa_zmq_start, psa_zmq_stop);
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
index d719e2f..e2665c0 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
@@ -17,8 +17,6 @@
* under the License.
*/
-
-
#ifndef PUBSUB_PSA_ZMQ_CONSTANTS_H_
#define PUBSUB_PSA_ZMQ_CONSTANTS_H_
@@ -29,13 +27,13 @@
#define PSA_ZMQ_DEFAULT_BASE_PORT 5501
#define PSA_ZMQ_DEFAULT_MAX_PORT 6000
-#define PSA_ZMQ_DEFAULT_QOS_SAMPLE_SCORE 30
-#define PSA_ZMQ_DEFAULT_QOS_CONTROL_SCORE 70
-#define PSA_ZMQ_DEFAULT_SCORE 30
+#define PSA_ZMQ_DEFAULT_QOS_SAMPLE_SCORE 30
+#define PSA_ZMQ_DEFAULT_QOS_CONTROL_SCORE 70
+#define PSA_ZMQ_DEFAULT_SCORE 30
-#define PSA_ZMQ_QOS_SAMPLE_SCORE_KEY "PSA_ZMQ_QOS_SAMPLE_SCORE"
-#define PSA_ZMQ_QOS_CONTROL_SCORE_KEY "PSA_ZMQ_QOS_CONTROL_SCORE"
-#define PSA_ZMQ_DEFAULT_SCORE_KEY "PSA_ZMQ_DEFAULT_SCORE"
+#define PSA_ZMQ_QOS_SAMPLE_SCORE_KEY "PSA_ZMQ_QOS_SAMPLE_SCORE"
+#define PSA_ZMQ_QOS_CONTROL_SCORE_KEY "PSA_ZMQ_QOS_CONTROL_SCORE"
+#define PSA_ZMQ_DEFAULT_SCORE_KEY "PSA_ZMQ_DEFAULT_SCORE"
#define PSA_ZMQ_METRICS_ENABLED "PSA_ZMQ_METRICS_ENABLED"
@@ -49,7 +47,7 @@
#define PUBSUB_ZMQ_VERBOSE_DEFAULT true
#define PUBSUB_ZMQ_PSA_IP_KEY "PSA_IP"
-#define PUBSUB_ZMQ_PSA_ITF_KEY "PSA_INTERFACE"
+#define PUBSUB_ZMQ_PSA_ITF_KEY "PSA_INTERFACE"
#define PUBSUB_ZMQ_NR_THREADS_KEY "PSA_ZMQ_NR_THREADS"
#define PUBSUB_ZMQ_DEFAULT_IP "127.0.0.1"
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index acf8182..bb0890f 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@ -243,7 +243,7 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
celixThreadMutex_destroy(&psa->serializers.mutex);
hashMap_destroy(psa->serializers.map, false, false);
- if (psa->zmq_auth != NULL){
+ if (psa->zmq_auth != NULL) {
zactor_destroy(&psa->zmq_auth);
}
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
index 18996e4..145798b 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
@@ -38,7 +38,7 @@ bool psa_zmq_checkVersion(version_pt msgVersion, const pubsub_zmq_msg_header_t *
if (msgVersion!=NULL) {
version_getMajor(msgVersion,&major);
version_getMinor(msgVersion,&minor);
- if(hdr->major==((unsigned char)major)){ /* Different major means incompatible */
+ if (hdr->major==((unsigned char)major)) { /* Different major means incompatible */
check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
}
}
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 95bac2d..24c3686 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -38,7 +38,7 @@
#define PSA_ZMQ_RECV_TIMEOUT 1000
#ifndef UUID_STR_LEN
-#define UUID_STR_LEN 37
+#define UUID_STR_LEN 37
#endif
@@ -145,7 +145,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
#ifdef BUILD_WITH_ZMQ_SECURITY
char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
- if (keys_bundle_dir == NULL){
+ if (keys_bundle_dir == NULL) {
return CELIX_SERVICE_EXCEPTION;
}
@@ -166,13 +166,13 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
printf("PSA_ZMQ_PSA_ZMQ_TS: Loading publisher key '%s'\n", pub_cert_path);
zcert_t* sub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, sub_cert_path);
- if (sub_cert == NULL){
+ if (sub_cert == NULL) {
printf("PSA_ZMQ_PSA_ZMQ_TS: Cannot load key '%s'\n", sub_cert_path);
return CELIX_SERVICE_EXCEPTION;
}
zcert_t* pub_cert = zcert_load(pub_cert_path);
- if (pub_cert == NULL){
+ if (pub_cert == NULL) {
zcert_destroy(&sub_cert);
printf("PSA_ZMQ_PSA_ZMQ_TS: Cannot load key '%s'\n", pub_cert_path);
return CELIX_SERVICE_EXCEPTION;
@@ -665,7 +665,7 @@ pubsub_admin_receiver_metrics_t* pubsub_zmqTopicReceiver_metrics(pubsub_zmq_topi
psa_zmq_subscriber_metrics_entry_t *metrics = hashMapIterator_nextValue(&iter3);
result->msgTypes[i].typeId = metrics->msgTypeId;
pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes, (void*)(uintptr_t)metrics->msgTypeId);
- if (msgSer) {
+ if (msgSer) {
snprintf(result->msgTypes[i].typeFqn, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", msgSer->msgName);
uuid_copy(result->msgTypes[i].origins[k].originUUID, metrics->origin);
result->msgTypes[i].origins[k].nrOfMessagesReceived = metrics->nrOfMessagesReceived;
@@ -700,7 +700,7 @@ static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t
hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
while (hashMapIterator_hasNext(&iter)) {
psa_zmq_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (!entry->connected){
+ if (!entry->connected) {
if (zmq_connect(receiver->zmqSock, entry->url) == 0) {
entry->connected = true;
} else {
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index d245be6..8166d4c 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -136,17 +136,17 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
//setting up zmq socket for ZMQ TopicSender
{
#ifdef BUILD_WITH_ZMQ_SECURITY
- char* secure_topics = NULL;
- bundleContext_getProperty(bundle_context, "SECURE_TOPICS", (const char **) &secure_topics);
+ char *secure_topics = NULL;
+ bundleContext_getProperty(bundle_context, "SECURE_TOPICS", (const char **) &secure_topics);
- if (secure_topics){
+ if (secure_topics) {
array_list_pt secure_topics_list = pubsub_getTopicsFromString(secure_topics);
int i;
int secure_topics_size = arrayList_size(secure_topics_list);
- for (i = 0; i < secure_topics_size; i++){
+ for (i = 0; i < secure_topics_size; i++) {
char* top = arrayList_get(secure_topics_list, i);
- if (strcmp(pubEP->topic, top) == 0){
+ if (strcmp(pubEP->topic, top) == 0) {
printf("PSA_ZMQ_TP: Secure topic: '%s'\n", top);
pubEP->is_secure = true;
}
@@ -158,9 +158,9 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
}
zcert_t* pub_cert = NULL;
- if (pubEP->is_secure){
+ if (pubEP->is_secure) {
char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
- if (keys_bundle_dir == NULL){
+ if (keys_bundle_dir == NULL) {
return CELIX_SERVICE_EXCEPTION;
}
@@ -177,7 +177,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
printf("PSA_ZMQ_TP: Loading key '%s'\n", cert_path);
pub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, cert_path);
- if (pub_cert == NULL){
+ if (pub_cert == NULL) {
printf("PSA_ZMQ_TP: Cannot load key '%s'\n", cert_path);
printf("PSA_ZMQ_TP: Topic '%s' NOT SECURED !\n", pubEP->topic);
pubEP->is_secure = false;
@@ -189,16 +189,16 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
if (zmqSocket==NULL) {
#ifdef BUILD_WITH_ZMQ_SECURITY
if (pubEP->is_secure) {
- zcert_destroy(&pub_cert);
- }
+ zcert_destroy(&pub_cert);
+ }
#endif
perror("Error for zmq_socket");
}
#ifdef BUILD_WITH_ZMQ_SECURITY
if (pubEP->is_secure) {
- zcert_apply (pub_cert, socket); // apply certificate to socket
- zsock_set_curve_server (socket, true); // setup the publisher's socket to use the curve functions
- }
+ zcert_apply (pub_cert, socket); // apply certificate to socket
+ zsock_set_curve_server (socket, true); // setup the publisher's socket to use the curve functions
+ }
#endif
if (zmqSocket != NULL && staticBindUrl != NULL) {
@@ -624,14 +624,14 @@ static void delay_first_send_for_late_joiners(pubsub_zmq_topic_sender_t *sender)
static bool firstSend = true;
- if(firstSend){
+ if (firstSend) {
L_INFO("PSA_ZMQ_TP: Delaying first send for late joiners...\n");
sleep(FIRST_SEND_DELAY_IN_SECONDS);
firstSend = false;
}
}
-static unsigned int rand_range(unsigned int min, unsigned int max){
+static unsigned int rand_range(unsigned int min, unsigned int max) {
double scaled = ((double)random())/((double)RAND_MAX);
return (unsigned int)((max-min+1)*scaled + min);
}
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/zmq_crypto.c b/bundles/pubsub/pubsub_admin_zmq/src/zmq_crypto.c
index 5ec7be1..5a3f176 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/zmq_crypto.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/zmq_crypto.c
@@ -20,8 +20,8 @@
* zmq_crypto.c
*
* \date Dec 2, 2016
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
*/
#include "zmq_crypto.h"
@@ -50,232 +50,229 @@ static void extract_keys_from_buffer(unsigned char *input, int inputlen, char **
* Return a valid zcert_t from an encoded file
* Caller is responsible for freeing by calling zcert_destroy(zcert** cert);
*/
-zcert_t* get_zcert_from_encoded_file(char* keysFilePath, char* keysFileName, char* file_path)
-{
+zcert_t* get_zcert_from_encoded_file(char* keysFilePath, char* keysFileName, char* file_path) {
- if (keysFilePath == NULL){
- keysFilePath = DEFAULT_KEYS_FILE_PATH;
- }
+ if (keysFilePath == NULL) {
+ keysFilePath = DEFAULT_KEYS_FILE_PATH;
+ }
- if (keysFileName == NULL){
- keysFileName = DEFAULT_KEYS_FILE_NAME;
- }
+ if (keysFileName == NULL) {
+ keysFileName = DEFAULT_KEYS_FILE_NAME;
+ }
- char* keys_data = read_file_content(keysFilePath, keysFileName);
- if (keys_data == NULL){
- return NULL;
- }
+ char* keys_data = read_file_content(keysFilePath, keysFileName);
+ if (keys_data == NULL) {
+ return NULL;
+ }
- char *key = NULL;
- char *iv = NULL;
- parse_key_lines(keys_data, &key, &iv);
- free(keys_data);
+ char *key = NULL;
+ char *iv = NULL;
+ parse_key_lines(keys_data, &key, &iv);
+ free(keys_data);
- if (key == NULL || iv == NULL){
- free(key);
- free(iv);
+ if (key == NULL || iv == NULL) {
+ free(key);
+ free(iv);
- printf("CRYPTO: Loading AES key and/or AES iv failed!\n");
- return NULL;
- }
+ printf("CRYPTO: Loading AES key and/or AES iv failed!\n");
+ return NULL;
+ }
- //At this point, we know an aes key and iv are stored and loaded
+ //At this point, we know an aes key and iv are stored and loaded
- // generate sha256 hashes
- unsigned char key_digest[EVP_MAX_MD_SIZE];
- unsigned char iv_digest[EVP_MAX_MD_SIZE];
- generate_sha256_hash((char*) key, key_digest);
- generate_sha256_hash((char*) iv, iv_digest);
+ // generate sha256 hashes
+ unsigned char key_digest[EVP_MAX_MD_SIZE];
+ unsigned char iv_digest[EVP_MAX_MD_SIZE];
+ generate_sha256_hash((char*) key, key_digest);
+ generate_sha256_hash((char*) iv, iv_digest);
- zchunk_t* encoded_secret = zchunk_slurp (file_path, 0);
- if (encoded_secret == NULL){
- free(key);
- free(iv);
+ zchunk_t *encoded_secret = zchunk_slurp(file_path, 0);
+ if (encoded_secret == NULL) {
+ free(key);
+ free(iv);
- return NULL;
- }
+ return NULL;
+ }
- int encoded_secret_size = (int) zchunk_size (encoded_secret);
- char* encoded_secret_data = zchunk_strdup(encoded_secret);
- zchunk_destroy (&encoded_secret);
+ int encoded_secret_size = (int) zchunk_size(encoded_secret);
+ char *encoded_secret_data = zchunk_strdup(encoded_secret);
+ zchunk_destroy(&encoded_secret);
- // Decryption of data
- int decryptedtext_len;
- unsigned char decryptedtext[encoded_secret_size];
- decryptedtext_len = decrypt((unsigned char *) encoded_secret_data, encoded_secret_size, key_digest, iv_digest, decryptedtext);
- decryptedtext[decryptedtext_len] = '\0';
+ // Decryption of data
+ int decryptedtext_len;
+ unsigned char decryptedtext[encoded_secret_size];
+ decryptedtext_len = decrypt((unsigned char *) encoded_secret_data, encoded_secret_size, key_digest, iv_digest, decryptedtext);
+ decryptedtext[decryptedtext_len] = '\0';
- EVP_cleanup();
+ EVP_cleanup();
- free(encoded_secret_data);
- free(key);
- free(iv);
+ free(encoded_secret_data);
+ free(key);
+ free(iv);
- // The public and private keys are retrieved
- char* public_text = NULL;
- char* secret_text = NULL;
+ // The public and private keys are retrieved
+ char *public_text = NULL;
+ char *secret_text = NULL;
- extract_keys_from_buffer(decryptedtext, decryptedtext_len, &public_text, &secret_text);
+ extract_keys_from_buffer(decryptedtext, decryptedtext_len, &public_text, &secret_text);
- byte public_key [32] = { 0 };
- byte secret_key [32] = { 0 };
+ byte public_key [32] = { 0 };
+ byte secret_key [32] = { 0 };
- zmq_z85_decode (public_key, public_text);
- zmq_z85_decode (secret_key, secret_text);
+ zmq_z85_decode(public_key, public_text);
+ zmq_z85_decode(secret_key, secret_text);
- zcert_t* cert_loaded = zcert_new_from(public_key, secret_key);
+ zcert_t *cert_loaded = zcert_new_from(public_key, secret_key);
- free(public_text);
- free(secret_text);
+ free(public_text);
+ free(secret_text);
- return cert_loaded;
+ return cert_loaded;
}
-int generate_sha256_hash(char* text, unsigned char* digest)
-{
- unsigned int digest_len;
+int generate_sha256_hash(char* text, unsigned char* digest) {
+ unsigned int digest_len;
- EVP_MD_CTX * mdctx = EVP_MD_CTX_new();
- EVP_DigestInit_ex(mdctx, EVP_sha256(), NULL);
- EVP_DigestUpdate(mdctx, text, strlen(text));
- EVP_DigestFinal_ex(mdctx, digest, &digest_len);
- EVP_MD_CTX_free(mdctx);
+ EVP_MD_CTX *mdctx = EVP_MD_CTX_new();
+ EVP_DigestInit_ex(mdctx, EVP_sha256(), NULL);
+ EVP_DigestUpdate(mdctx, text, strlen(text));
+ EVP_DigestFinal_ex(mdctx, digest, &digest_len);
+ EVP_MD_CTX_free(mdctx);
- return digest_len;
+ return digest_len;
}
-int decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext)
-{
- int len;
- int plaintext_len;
+int decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext) {
+ int len;
+ int plaintext_len;
- EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new();
+ EVP_CIPHER_CTX *ctx = EVP_CIPHER_CTX_new();
- EVP_DecryptInit_ex(ctx, EVP_aes_256_cbc(), NULL, key, iv);
- EVP_DecryptUpdate(ctx, plaintext, &len, ciphertext, ciphertext_len);
- plaintext_len = len;
- EVP_DecryptFinal_ex(ctx, plaintext + len, &len);
- plaintext_len += len;
+ EVP_DecryptInit_ex(ctx, EVP_aes_256_cbc(), NULL, key, iv);
+ EVP_DecryptUpdate(ctx, plaintext, &len, ciphertext, ciphertext_len);
+ plaintext_len = len;
+ EVP_DecryptFinal_ex(ctx, plaintext + len, &len);
+ plaintext_len += len;
- EVP_CIPHER_CTX_free(ctx);
+ EVP_CIPHER_CTX_free(ctx);
- return plaintext_len;
+ return plaintext_len;
}
/**
* Caller is responsible for freeing the returned value
*/
-static char* read_file_content(char* filePath, char* fileName){
-
- char fileNameWithPath[MAX_FILE_PATH_LENGTH];
- snprintf(fileNameWithPath, MAX_FILE_PATH_LENGTH, "%s/%s", filePath, fileName);
- int rc = 0;
-
- if (!zsys_file_exists(fileNameWithPath)){
- printf("CRYPTO: Keys file '%s' doesn't exist!\n", fileNameWithPath);
- return NULL;
- }
-
- zfile_t* keys_file = zfile_new (filePath, fileName);
- rc = zfile_input (keys_file);
- if (rc != 0){
- zfile_destroy(&keys_file);
- printf("CRYPTO: Keys file '%s' not readable!\n", fileNameWithPath);
- return NULL;
- }
-
- ssize_t keys_file_size = zsys_file_size (fileNameWithPath);
- zchunk_t* keys_chunk = zfile_read (keys_file, keys_file_size, 0);
- if (keys_chunk == NULL){
- zfile_close(keys_file);
- zfile_destroy(&keys_file);
- printf("CRYPTO: Can't read file '%s'!\n", fileNameWithPath);
- return NULL;
- }
-
- char* keys_data = zchunk_strdup(keys_chunk);
- zchunk_destroy(&keys_chunk);
- zfile_close(keys_file);
- zfile_destroy (&keys_file);
-
- return keys_data;
+static char* read_file_content(char* filePath, char* fileName) {
+
+ char fileNameWithPath[MAX_FILE_PATH_LENGTH];
+ snprintf(fileNameWithPath, MAX_FILE_PATH_LENGTH, "%s/%s", filePath, fileName);
+ int rc = 0;
+
+ if (!zsys_file_exists(fileNameWithPath)) {
+ printf("CRYPTO: Keys file '%s' doesn't exist!\n", fileNameWithPath);
+ return NULL;
+ }
+
+ zfile_t *keys_file = zfile_new (filePath, fileName);
+ rc = zfile_input (keys_file);
+ if (rc != 0) {
+ zfile_destroy(&keys_file);
+ printf("CRYPTO: Keys file '%s' not readable!\n", fileNameWithPath);
+ return NULL;
+ }
+
+ ssize_t keys_file_size = zsys_file_size (fileNameWithPath);
+ zchunk_t *keys_chunk = zfile_read (keys_file, keys_file_size, 0);
+ if (keys_chunk == NULL) {
+ zfile_close(keys_file);
+ zfile_destroy(&keys_file);
+ printf("CRYPTO: Can't read file '%s'!\n", fileNameWithPath);
+ return NULL;
+ }
+
+ char *keys_data = zchunk_strdup(keys_chunk);
+ zchunk_destroy(&keys_chunk);
+ zfile_close(keys_file);
+ zfile_destroy (&keys_file);
+
+ return keys_data;
}
-static void parse_key_lines(char *keysBuffer, char **key, char **iv){
- char *line = NULL, *saveLinePointer = NULL;
+static void parse_key_lines(char *keysBuffer, char **key, char **iv) {
+ char *line = NULL, *saveLinePointer = NULL;
- bool firstTime = true;
- do {
- if (firstTime){
- line = strtok_r(keysBuffer, "\n", &saveLinePointer);
- firstTime = false;
- }else {
- line = strtok_r(NULL, "\n", &saveLinePointer);
- }
+ bool firstTime = true;
+ do {
+ if (firstTime) {
+ line = strtok_r(keysBuffer, "\n", &saveLinePointer);
+ firstTime = false;
+ } else {
+ line = strtok_r(NULL, "\n", &saveLinePointer);
+ }
- if (line == NULL){
- break;
- }
+ if (line == NULL) {
+ break;
+ }
- parse_key_line(line, key, iv);
+ parse_key_line(line, key, iv);
- } while((*key == NULL || *iv == NULL) && line != NULL);
+ } while ((*key == NULL || *iv == NULL) && line != NULL);
}
-static void parse_key_line(char *line, char **key, char **iv){
- char *detectedKey = NULL, *detectedValue= NULL;
-
- char* sep_at = strchr(line, ':');
- if (sep_at == NULL){
- return;
- }
-
- *sep_at = '\0'; // overwrite first separator, creating two strings.
- detectedKey = line;
- detectedValue = sep_at + 1;
-
- if (detectedKey == NULL || detectedValue == NULL){
- return;
- }
- if (detectedKey[0] == '\0' || detectedValue[0] == '\0'){
- return;
- }
-
- if (*key == NULL && strcmp(detectedKey, KEY_TO_GET) == 0){
- *key = strndup(detectedValue, AES_KEY_LENGTH);
- } else if (*iv == NULL && strcmp(detectedKey, IV_TO_GET) == 0){
- *iv = strndup(detectedValue, AES_IV_LENGTH);
- }
+static void parse_key_line(char *line, char **key, char **iv) {
+ char *detectedKey = NULL, *detectedValue= NULL;
+
+ char *sep_at = strchr(line, ':');
+ if (sep_at == NULL) {
+ return;
+ }
+
+ *sep_at = '\0'; // overwrite first separator, creating two strings.
+ detectedKey = line;
+ detectedValue = sep_at + 1;
+
+ if (detectedKey == NULL || detectedValue == NULL) {
+ return;
+ }
+ if (detectedKey[0] == '\0' || detectedValue[0] == '\0') {
+ return;
+ }
+
+ if (*key == NULL && strcmp(detectedKey, KEY_TO_GET) == 0) {
+ *key = strndup(detectedValue, AES_KEY_LENGTH);
+ } else if (*iv == NULL && strcmp(detectedKey, IV_TO_GET) == 0) {
+ *iv = strndup(detectedValue, AES_IV_LENGTH);
+ }
}
static void extract_keys_from_buffer(unsigned char *input, int inputlen, char **publicKey, char **secretKey) {
- // Load decrypted text buffer
- zchunk_t* secret_decrypted = zchunk_new(input, inputlen);
- if (secret_decrypted == NULL){
- printf("CRYPTO: Failed to create zchunk\n");
- return;
- }
-
- zconfig_t* secret_config = zconfig_chunk_load (secret_decrypted);
- zchunk_destroy (&secret_decrypted);
- if (secret_config == NULL){
- printf("CRYPTO: Failed to create zconfig\n");
- return;
- }
-
- // Extract public and secret key from text buffer
- char* public_text = zconfig_get (secret_config, "/curve/public-key", NULL);
- char* secret_text = zconfig_get (secret_config, "/curve/secret-key", NULL);
-
- if (public_text == NULL || secret_text == NULL){
- zconfig_destroy(&secret_config);
- printf("CRYPTO: Loading public / secret key from text-buffer failed!\n");
- return;
- }
-
- *publicKey = strndup(public_text, ZMQ_KEY_LENGTH + 1);
- *secretKey = strndup(secret_text, ZMQ_KEY_LENGTH + 1);
-
- zconfig_destroy(&secret_config);
+ // Load decrypted text buffer
+ zchunk_t *secret_decrypted = zchunk_new(input, inputlen);
+ if (secret_decrypted == NULL) {
+ printf("CRYPTO: Failed to create zchunk\n");
+ return;
+ }
+
+ zconfig_t *secret_config = zconfig_chunk_load(secret_decrypted);
+ zchunk_destroy (&secret_decrypted);
+ if (secret_config == NULL) {
+ printf("CRYPTO: Failed to create zconfig\n");
+ return;
+ }
+
+ // Extract public and secret key from text buffer
+ char *public_text = zconfig_get(secret_config, "/curve/public-key", NULL);
+ char *secret_text = zconfig_get(secret_config, "/curve/secret-key", NULL);
+
+ if (public_text == NULL || secret_text == NULL) {
+ zconfig_destroy(&secret_config);
+ printf("CRYPTO: Loading public / secret key from text-buffer failed!\n");
+ return;
+ }
+
+ *publicKey = strndup(public_text, ZMQ_KEY_LENGTH + 1);
+ *secretKey = strndup(secret_text, ZMQ_KEY_LENGTH + 1);
+
+ zconfig_destroy(&secret_config);
}
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/zmq_crypto.h b/bundles/pubsub/pubsub_admin_zmq/src/zmq_crypto.h
index 82fe576..0eef8f1 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/zmq_crypto.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/zmq_crypto.h
@@ -20,8 +20,8 @@
* zmq_crypto.h
*
* \date Dec 2, 2016
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
*/
#ifndef ZMQ_CRYPTO_H_
diff --git a/bundles/pubsub/pubsub_api/include/pubsub/publisher.h b/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
index 161557b..96483d8 100644
--- a/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
+++ b/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
@@ -20,8 +20,8 @@
* publisher.h
*
* \date Jan 7, 2016
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
*/
#ifndef __PUBSUB_PUBLISHER_H_
@@ -30,14 +30,14 @@
#include <stdlib.h>
#define PUBSUB_PUBLISHER_SERVICE_NAME "pubsub.publisher"
-#define PUBSUB_PUBLISHER_SERVICE_VERSION "3.0.0"
+#define PUBSUB_PUBLISHER_SERVICE_VERSION "3.0.0"
//properties
#define PUBSUB_PUBLISHER_TOPIC "topic"
#define PUBSUB_PUBLISHER_SCOPE "scope"
#define PUBSUB_PUBLISHER_CONFIG "pubsub.config"
-#define PUBSUB_PUBLISHER_SCOPE_DEFAULT "default"
+#define PUBSUB_PUBLISHER_SCOPE_DEFAULT "default"
diff --git a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
index 23a9e9b..3debad9 100644
--- a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
+++ b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
@@ -20,8 +20,8 @@
* subscriber.h
*
* \date Jan 7, 2016
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
*/
#ifndef __PUBSUB_SUBSCRIBER_H_
diff --git a/bundles/pubsub/pubsub_discovery/CMakeLists.txt b/bundles/pubsub/pubsub_discovery/CMakeLists.txt
index c372973..a85fde6 100644
--- a/bundles/pubsub/pubsub_discovery/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_discovery/CMakeLists.txt
@@ -21,20 +21,20 @@ find_package(Jansson REQUIRED)
add_celix_bundle(celix_pubsub_discovery_etcd
BUNDLE_SYMBOLICNAME "apache_celix_pubsub_discovery_etcd"
VERSION "1.0.0"
- GROUP "Celix/PubSub"
+ GROUP "Celix/PubSub"
SOURCES
src/psd_activator.c
src/pubsub_discovery_impl.c
)
target_include_directories(celix_pubsub_discovery_etcd PRIVATE src)
target_include_directories(celix_pubsub_discovery_etcd SYSTEM PRIVATE
- ${CURL_INCLUDE_DIR}
- ${JANSSON_INCLUDE_DIR}
+ ${CURL_INCLUDE_DIR}
+ ${JANSSON_INCLUDE_DIR}
)
target_link_libraries(celix_pubsub_discovery_etcd PRIVATE
- Celix::pubsub_spi Celix::framework Celix::etcdlib_static
- Celix::shell_api Celix::log_helper
- ${CURL_LIBRARIES} ${JANSSON_LIBRARIES})
+ Celix::pubsub_spi Celix::framework Celix::etcdlib_static
+ Celix::shell_api Celix::log_helper
+ ${CURL_LIBRARIES} ${JANSSON_LIBRARIES})
install_celix_bundle(celix_pubsub_discovery_etcd EXPORT celix COMPONENT pubsub)
diff --git a/bundles/pubsub/pubsub_discovery/src/psd_activator.c b/bundles/pubsub/pubsub_discovery/src/psd_activator.c
index 5201192..059098e 100644
--- a/bundles/pubsub/pubsub_discovery/src/psd_activator.c
+++ b/bundles/pubsub/pubsub_discovery/src/psd_activator.c
@@ -33,74 +33,74 @@
#include "pubsub_discovery_impl.h"
typedef struct psd_activator {
- pubsub_discovery_t *pubsub_discovery;
+ pubsub_discovery_t *pubsub_discovery;
- long publishAnnounceSvcTrackerId;
- //service_tracker_pt pstmPublishersTracker;
+ long publishAnnounceSvcTrackerId;
+ //service_tracker_pt pstmPublishersTracker;
- pubsub_announce_endpoint_listener_t listenerSvc;
- long listenerSvcId;
+ pubsub_announce_endpoint_listener_t listenerSvc;
+ long listenerSvcId;
- command_service_t cmdSvc;
- long cmdSvcId;
+ command_service_t cmdSvc;
+ long cmdSvcId;
- log_helper_t *loghelper;
+ log_helper_t *loghelper;
} psd_activator_t;
static celix_status_t psd_start(psd_activator_t *act, celix_bundle_context_t *ctx) {
- celix_status_t status;
-
- logHelper_create(ctx, &act->loghelper);
- logHelper_start(act->loghelper);
-
- act->pubsub_discovery = pubsub_discovery_create(ctx, act->loghelper);
- // pubsub_discovery_start needs to be first to initialize
- status = pubsub_discovery_start(act->pubsub_discovery);
-
- celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
- opts.filter.serviceName = PUBSUB_DISCOVERED_ENDPOINT_LISTENER_SERVICE;
- opts.callbackHandle = act->pubsub_discovery;
- opts.addWithOwner = pubsub_discovery_discoveredEndpointsListenerAdded;
- opts.removeWithOwner = pubsub_discovery_discoveredEndpointsListenerRemoved;
- act->publishAnnounceSvcTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
-
- act->listenerSvc.handle = act->pubsub_discovery;
- act->listenerSvc.announceEndpoint = pubsub_discovery_announceEndpoint;
- act->listenerSvc.revokeEndpoint = pubsub_discovery_revokeEndpoint;
-
- //register shell command service
- //register shell command
- if (status == CELIX_SUCCESS) {
- act->cmdSvc.handle = act->pubsub_discovery;
- act->cmdSvc.executeCommand = pubsub_discovery_executeCommand;
- celix_properties_t *props = celix_properties_create();
- celix_properties_set(props, OSGI_SHELL_COMMAND_NAME, "psd_etcd");
- celix_properties_set(props, OSGI_SHELL_COMMAND_USAGE, "psd_etcd"); //TODO add search topic/scope option
- celix_properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "Overview of discovered/announced endpoints from/to ETCD");
- act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props);
- }
-
- if (status == CELIX_SUCCESS) {
- act->listenerSvcId = celix_bundleContext_registerService(ctx, &act->listenerSvc, PUBSUB_ANNOUNCE_ENDPOINT_LISTENER_SERVICE, NULL);
- } else {
- act->listenerSvcId = -1L;
- }
-
- return status;
+ celix_status_t status;
+
+ logHelper_create(ctx, &act->loghelper);
+ logHelper_start(act->loghelper);
+
+ act->pubsub_discovery = pubsub_discovery_create(ctx, act->loghelper);
+ // pubsub_discovery_start needs to be first to initialize
+ status = pubsub_discovery_start(act->pubsub_discovery);
+
+ celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+ opts.filter.serviceName = PUBSUB_DISCOVERED_ENDPOINT_LISTENER_SERVICE;
+ opts.callbackHandle = act->pubsub_discovery;
+ opts.addWithOwner = pubsub_discovery_discoveredEndpointsListenerAdded;
+ opts.removeWithOwner = pubsub_discovery_discoveredEndpointsListenerRemoved;
+ act->publishAnnounceSvcTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+
+ act->listenerSvc.handle = act->pubsub_discovery;
+ act->listenerSvc.announceEndpoint = pubsub_discovery_announceEndpoint;
+ act->listenerSvc.revokeEndpoint = pubsub_discovery_revokeEndpoint;
+
+ //register shell command service
+ //register shell command
+ if (status == CELIX_SUCCESS) {
+ act->cmdSvc.handle = act->pubsub_discovery;
+ act->cmdSvc.executeCommand = pubsub_discovery_executeCommand;
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, OSGI_SHELL_COMMAND_NAME, "psd_etcd");
+ celix_properties_set(props, OSGI_SHELL_COMMAND_USAGE, "psd_etcd"); //TODO add search topic/scope option
+ celix_properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "Overview of discovered/announced endpoints from/to ETCD");
+ act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props);
+ }
+
+ if (status == CELIX_SUCCESS) {
+ act->listenerSvcId = celix_bundleContext_registerService(ctx, &act->listenerSvc, PUBSUB_ANNOUNCE_ENDPOINT_LISTENER_SERVICE, NULL);
+ } else {
+ act->listenerSvcId = -1L;
+ }
+
+ return status;
}
static celix_status_t psd_stop(psd_activator_t *act, celix_bundle_context_t *ctx) {
- celix_bundleContext_stopTracker(ctx, act->publishAnnounceSvcTrackerId);
- celix_bundleContext_unregisterService(ctx, act->listenerSvcId);
- celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
+ celix_bundleContext_stopTracker(ctx, act->publishAnnounceSvcTrackerId);
+ celix_bundleContext_unregisterService(ctx, act->listenerSvcId);
+ celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
- celix_status_t status = pubsub_discovery_stop(act->pubsub_discovery);
- pubsub_discovery_destroy(act->pubsub_discovery);
+ celix_status_t status = pubsub_discovery_stop(act->pubsub_discovery);
+ pubsub_discovery_destroy(act->pubsub_discovery);
- logHelper_stop(act->loghelper);
- logHelper_destroy(&act->loghelper);
+ logHelper_stop(act->loghelper);
+ logHelper_destroy(&act->loghelper);
- return status;
+ return status;
}
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index 4adfbbd..f96ba39 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -526,7 +526,7 @@ static void pubsub_discovery_removeDiscoveredEndpoint(pubsub_discovery_t *disc,
}
celix_properties_t* pubsub_discovery_parseEndpoint(pubsub_discovery_t *disc, const char *key, const char* etcdValue) {
- properties_t *props = celix_properties_create();
+ celix_properties_t *props = celix_properties_create();
// etcdValue contains the json formatted string
json_error_t error;
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
index 61c41e4..6a51902 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
@@ -33,56 +33,56 @@
#define PUBSUB_ETCD_DISCOVERY_DEFAULT_VERBOSE false
-#define PUBSUB_DISCOVERY_SERVER_IP_KEY "PUBSUB_DISCOVERY_ETCD_SERVER_IP"
-#define PUBSUB_DISCOVERY_SERVER_PORT_KEY "PUBSUB_DISCOVERY_ETCD_SERVER_PORT"
-#define PUBSUB_DISCOVERY_SERVER_PATH_KEY "PUBSUB_DISCOVERY_ETCD_ROOT_PATH"
+#define PUBSUB_DISCOVERY_SERVER_IP_KEY "PUBSUB_DISCOVERY_ETCD_SERVER_IP"
+#define PUBSUB_DISCOVERY_SERVER_PORT_KEY "PUBSUB_DISCOVERY_ETCD_SERVER_PORT"
+#define PUBSUB_DISCOVERY_SERVER_PATH_KEY "PUBSUB_DISCOVERY_ETCD_ROOT_PATH"
#define PUBSUB_DISCOVERY_ETCD_TTL_KEY "PUBSUB_DISCOVERY_ETCD_TTL"
-#define PUBSUB_DISCOVERY_SERVER_IP_DEFAULT "127.0.0.1"
-#define PUBSUB_DISCOVERY_SERVER_PORT_DEFAULT 2379
-#define PUBSUB_DISCOVERY_SERVER_PATH_DEFAULT "pubsub/"
+#define PUBSUB_DISCOVERY_SERVER_IP_DEFAULT "127.0.0.1"
+#define PUBSUB_DISCOVERY_SERVER_PORT_DEFAULT 2379
+#define PUBSUB_DISCOVERY_SERVER_PATH_DEFAULT "pubsub/"
#define PUBSUB_DISCOVERY_ETCD_TTL_DEFAULT 30
typedef struct pubsub_discovery {
- bundle_context_pt context;
- log_helper_t *logHelper;
+ bundle_context_pt context;
+ log_helper_t *logHelper;
- celix_thread_mutex_t discoveredEndpointsMutex; //when locked with EndpointsListenersMutex -> first lock this
- hash_map_pt discoveredEndpoints; //<key = uuid,celix_properties_t /*endpoint*/>>
+ celix_thread_mutex_t discoveredEndpointsMutex; //when locked with EndpointsListenersMutex -> first lock this
+ hash_map_pt discoveredEndpoints; //<key = uuid,celix_properties_t /*endpoint*/>>
- celix_thread_mutex_t announcedEndpointsMutex;
- hash_map_pt announcedEndpoints; //<key = char* (etcd key),pubsub_announce_entry_t /*endpoint*/>>
+ celix_thread_mutex_t announcedEndpointsMutex;
+ hash_map_pt announcedEndpoints; //<key = char* (etcd key),pubsub_announce_entry_t /*endpoint*/>>
- celix_thread_mutex_t discoveredEndpointsListenersMutex;
- hash_map_pt discoveredEndpointsListeners; //key=svcId, value=pubsub_discovered_endpoint_listener_t
+ celix_thread_mutex_t discoveredEndpointsListenersMutex;
+ hash_map_pt discoveredEndpointsListeners; //key=svcId, value=pubsub_discovered_endpoint_listener_t
- //NOTE using pthread instead of celix mutex/cond so that condwait with abs time using a MONOTONIC clock can be used
- pthread_mutex_t waitMutex;
- pthread_cond_t waitCond;
+ //NOTE using pthread instead of celix mutex/cond so that condwait with abs time using a MONOTONIC clock can be used
+ pthread_mutex_t waitMutex;
+ pthread_cond_t waitCond;
- celix_thread_mutex_t runningMutex;
+ celix_thread_mutex_t runningMutex;
bool running;
celix_thread_t watchThread;
celix_thread_t refreshTTLThread;
//configurable by config/env.
- const char *pubsubPath;
- bool verbose;
- int ttlForEntries;
- int sleepInsecBetweenTTLRefresh;
- const char *fwUUID;
+ const char *pubsubPath;
+ bool verbose;
+ int ttlForEntries;
+ int sleepInsecBetweenTTLRefresh;
+ const char *fwUUID;
} pubsub_discovery_t;
typedef struct pubsub_announce_entry {
- char *key; //etcd key
- bool isSet; //whether the value is already set (in case of unavailable etcd server this can linger)
- int refreshCount;
- int setCount;
- int errorCount;
- celix_properties_t *properties; //the endpoint properties
- struct timespec createTime; //from MONOTONIC clock
+ char *key; //etcd key
+ bool isSet; //whether the value is already set (in case of unavailable etcd server this can linger)
+ int refreshCount;
+ int setCount;
+ int errorCount;
+ celix_properties_t *properties; //the endpoint properties
+ struct timespec createTime; //from MONOTONIC clock
} pubsub_announce_entry_t;
diff --git a/bundles/pubsub/pubsub_serializer_json/CMakeLists.txt b/bundles/pubsub/pubsub_serializer_json/CMakeLists.txt
index 010e864..0231d81 100644
--- a/bundles/pubsub/pubsub_serializer_json/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_serializer_json/CMakeLists.txt
@@ -21,14 +21,14 @@ find_package(Jansson REQUIRED)
add_celix_bundle(celix_pubsub_serializer_json
BUNDLE_SYMBOLICNAME "apache_celix_pubsub_serializer_json"
VERSION "1.0.0"
- GROUP "Celix/PubSub"
+ GROUP "Celix/PubSub"
SOURCES
src/ps_json_serializer_activator.c
- src/pubsub_serializer_impl.c
+ src/pubsub_serializer_impl.c
)
target_include_directories(celix_pubsub_serializer_json PRIVATE
- src
- ${JANSSON_INCLUDE_DIR}
+ src
+ ${JANSSON_INCLUDE_DIR}
)
set_target_properties(celix_pubsub_serializer_json PROPERTIES INSTALL_RPATH "$ORIGIN")
target_link_libraries(celix_pubsub_serializer_json PRIVATE Celix::pubsub_spi Celix::framework Celix::dfi ${JANSSON_LIBRARIES} Celix::log_helper)
diff --git a/bundles/pubsub/pubsub_serializer_json/src/ps_json_serializer_activator.c b/bundles/pubsub/pubsub_serializer_json/src/ps_json_serializer_activator.c
index 21f6a7d..4a77a71 100644
--- a/bundles/pubsub/pubsub_serializer_json/src/ps_json_serializer_activator.c
+++ b/bundles/pubsub/pubsub_serializer_json/src/ps_json_serializer_activator.c
@@ -24,36 +24,36 @@
#include "pubsub_serializer_impl.h"
typedef struct psjs_activator {
- pubsub_json_serializer_t* serializer;
+ pubsub_json_serializer_t* serializer;
- pubsub_serializer_service_t serializerSvc;
- long serializerSvcId;
+ pubsub_serializer_service_t serializerSvc;
+ long serializerSvcId;
} psjs_activator_t;
static int psjs_start(psjs_activator_t *act, celix_bundle_context_t *ctx) {
- act->serializerSvcId = -1L;
+ act->serializerSvcId = -1L;
- celix_status_t status = pubsubSerializer_create(ctx, &(act->serializer));
- if (status == CELIX_SUCCESS) {
- act->serializerSvc.handle = act->serializer;
+ celix_status_t status = pubsubSerializer_create(ctx, &(act->serializer));
+ if (status == CELIX_SUCCESS) {
+ act->serializerSvc.handle = act->serializer;
- act->serializerSvc.createSerializerMap = pubsubSerializer_createSerializerMap;
- act->serializerSvc.destroySerializerMap = pubsubSerializer_destroySerializerMap;
+ act->serializerSvc.createSerializerMap = pubsubSerializer_createSerializerMap;
+ act->serializerSvc.destroySerializerMap = pubsubSerializer_destroySerializerMap;
- /* Set serializer type */
- celix_properties_t *props = celix_properties_create();
- celix_properties_set(props, PUBSUB_SERIALIZER_TYPE_KEY, PUBSUB_JSON_SERIALIZER_TYPE);
+ /* Set serializer type */
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, PUBSUB_SERIALIZER_TYPE_KEY, PUBSUB_JSON_SERIALIZER_TYPE);
- act->serializerSvcId = celix_bundleContext_registerService(ctx, &act->serializerSvc, PUBSUB_SERIALIZER_SERVICE_NAME, props);
- }
- return status;
+ act->serializerSvcId = celix_bundleContext_registerService(ctx, &act->serializerSvc, PUBSUB_SERIALIZER_SERVICE_NAME, props);
+ }
+ return status;
}
static int psjs_stop(psjs_activator_t *act, celix_bundle_context_t *ctx) {
- celix_bundleContext_unregisterService(ctx, act->serializerSvcId);
- act->serializerSvcId = -1L;
- pubsubSerializer_destroy(act->serializer);
- return CELIX_SUCCESS;
+ celix_bundleContext_unregisterService(ctx, act->serializerSvcId);
+ act->serializerSvcId = -1L;
+ pubsubSerializer_destroy(act->serializer);
+ return CELIX_SUCCESS;
}
CELIX_GEN_BUNDLE_ACTIVATOR(psjs_activator_t, psjs_start, psjs_stop)
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
index 3bc5364..4dc9916 100644
--- a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
+++ b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
@@ -34,12 +34,12 @@
#include "pubsub_serializer_impl.h"
-#define SYSTEM_BUNDLE_ARCHIVE_PATH "CELIX_FRAMEWORK_EXTENDER_PATH"
+#define SYSTEM_BUNDLE_ARCHIVE_PATH "CELIX_FRAMEWORK_EXTENDER_PATH"
#define MAX_PATH_LEN 1024
struct pubsub_json_serializer {
- bundle_context_pt bundle_context;
- log_helper_pt loghelper;
+ bundle_context_pt bundle_context;
+ log_helper_pt loghelper;
};
@@ -50,11 +50,11 @@ static void pubsubMsgSerializer_freeMsg(void* handle, void *msg);
typedef struct pubsub_json_msg_serializer_impl {
- dyn_message_type *msgType;
+ dyn_message_type *msgType;
- unsigned int msgId;
- const char* msgName;
- version_pt msgVersion;
+ unsigned int msgId;
+ const char* msgName;
+ version_pt msgVersion;
} pubsub_json_msg_serializer_impl_t;
static char* pubsubSerializer_getMsgDescriptionDir(bundle_pt bundle);
@@ -63,277 +63,274 @@ static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgTypesMap,bundle
static void dfi_log(void *handle, int level, const char *file, int line, const char *msg, ...) {
- va_list ap;
- pubsub_json_serializer_t *serializer = handle;
- char *logStr = NULL;
- va_start(ap, msg);
- vasprintf(&logStr, msg, ap);
- va_end(ap);
- logHelper_log(serializer->loghelper, level, "FILE:%s, LINE:%i, MSG:%s", file, line, logStr);
- free(logStr);
+ va_list ap;
+ pubsub_json_serializer_t *serializer = handle;
+ char *logStr = NULL;
+ va_start(ap, msg);
+ vasprintf(&logStr, msg, ap);
+ va_end(ap);
+ logHelper_log(serializer->loghelper, level, "FILE:%s, LINE:%i, MSG:%s", file, line, logStr);
+ free(logStr);
}
celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_json_serializer_t** serializer) {
- celix_status_t status = CELIX_SUCCESS;
+ celix_status_t status = CELIX_SUCCESS;
- *serializer = calloc(1, sizeof(**serializer));
+ *serializer = calloc(1, sizeof(**serializer));
- if (!*serializer) {
- status = CELIX_ENOMEM;
- }
- else{
+ if (!*serializer) {
+ status = CELIX_ENOMEM;
+ }
+ else{
- (*serializer)->bundle_context= context;
+ (*serializer)->bundle_context= context;
- if (logHelper_create(context, &(*serializer)->loghelper) == CELIX_SUCCESS) {
- logHelper_start((*serializer)->loghelper);
- jsonSerializer_logSetup(dfi_log, (*serializer), 1);
- dynFunction_logSetup(dfi_log, (*serializer), 1);
- dynType_logSetup(dfi_log, (*serializer), 1);
- dynCommon_logSetup(dfi_log, (*serializer), 1);
- }
+ if (logHelper_create(context, &(*serializer)->loghelper) == CELIX_SUCCESS) {
+ logHelper_start((*serializer)->loghelper);
+ jsonSerializer_logSetup(dfi_log, (*serializer), 1);
+ dynFunction_logSetup(dfi_log, (*serializer), 1);
+ dynType_logSetup(dfi_log, (*serializer), 1);
+ dynCommon_logSetup(dfi_log, (*serializer), 1);
+ }
- }
+ }
- return status;
+ return status;
}
celix_status_t pubsubSerializer_destroy(pubsub_json_serializer_t* serializer) {
- celix_status_t status = CELIX_SUCCESS;
+ celix_status_t status = CELIX_SUCCESS;
- logHelper_stop(serializer->loghelper);
- logHelper_destroy(&serializer->loghelper);
+ logHelper_stop(serializer->loghelper);
+ logHelper_destroy(&serializer->loghelper);
- free(serializer);
+ free(serializer);
- return status;
+ return status;
}
celix_status_t pubsubSerializer_createSerializerMap(void *handle, bundle_pt bundle, hash_map_pt* serializerMap) {
- celix_status_t status = CELIX_SUCCESS;
- pubsub_json_serializer_t *serializer = handle;
-
- hash_map_pt map = hashMap_create(NULL, NULL, NULL, NULL);
-
- if (map != NULL) {
- pubsubSerializer_fillMsgSerializerMap(map, bundle);
- } else {
- logHelper_log(serializer->loghelper, OSGI_LOGSERVICE_ERROR, "Cannot allocate memory for msg map");
- status = CELIX_ENOMEM;
- }
-
- if (status == CELIX_SUCCESS) {
- *serializerMap = map;
- }
- return status;
+ celix_status_t status = CELIX_SUCCESS;
+ pubsub_json_serializer_t *serializer = handle;
+
+ hash_map_pt map = hashMap_create(NULL, NULL, NULL, NULL);
+
+ if (map != NULL) {
+ pubsubSerializer_fillMsgSerializerMap(map, bundle);
+ } else {
+ logHelper_log(serializer->loghelper, OSGI_LOGSERVICE_ERROR, "Cannot allocate memory for msg map");
+ status = CELIX_ENOMEM;
+ }
+
+ if (status == CELIX_SUCCESS) {
+ *serializerMap = map;
+ }
+ return status;
}
celix_status_t pubsubSerializer_destroySerializerMap(void* handle __attribute__((unused)), hash_map_pt serializerMap) {
- celix_status_t status = CELIX_SUCCESS;
- //pubsub_json_serializer_t *serializer = handle;
- if (serializerMap == NULL) {
- return CELIX_ILLEGAL_ARGUMENT;
- }
-
- hash_map_iterator_t iter = hashMapIterator_construct(serializerMap);
- while (hashMapIterator_hasNext(&iter)) {
- pubsub_msg_serializer_t* msgSerializer = hashMapIterator_nextValue(&iter);
- pubsub_json_msg_serializer_impl_t *impl = msgSerializer->handle;
- dyn_message_type *dynMsg = impl->msgType;
- dynMessage_destroy(dynMsg); //note msgSer->name and msgSer->version owned by dynType
- free(msgSerializer); //also contains the service struct.
- free(impl);
- }
-
- hashMap_destroy(serializerMap, false, false);
-
- return status;
+ celix_status_t status = CELIX_SUCCESS;
+ //pubsub_json_serializer_t *serializer = handle;
+ if (serializerMap == NULL) {
+ return CELIX_ILLEGAL_ARGUMENT;
+ }
+
+ hash_map_iterator_t iter = hashMapIterator_construct(serializerMap);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_msg_serializer_t* msgSerializer = hashMapIterator_nextValue(&iter);
+ pubsub_json_msg_serializer_impl_t *impl = msgSerializer->handle;
+ dyn_message_type *dynMsg = impl->msgType;
+ dynMessage_destroy(dynMsg); //note msgSer->name and msgSer->version owned by dynType
+ free(msgSerializer); //also contains the service struct.
+ free(impl);
+ }
+
+ hashMap_destroy(serializerMap, false, false);
+
+ return status;
}
celix_status_t pubsubMsgSerializer_serialize(void *handle, const void* msg, void** out, size_t *outLen) {
- celix_status_t status = CELIX_SUCCESS;
+ celix_status_t status = CELIX_SUCCESS;
- pubsub_json_msg_serializer_impl_t *impl = handle;
+ pubsub_json_msg_serializer_impl_t *impl = handle;
- char *jsonOutput = NULL;
- dyn_type* dynType = NULL;
- dyn_message_type *dynMsg = impl->msgType;
- dynMessage_getMessageType(dynMsg, &dynType);
+ char *jsonOutput = NULL;
+ dyn_type* dynType = NULL;
+ dyn_message_type *dynMsg = impl->msgType;
+ dynMessage_getMessageType(dynMsg, &dynType);
- if (jsonSerializer_serialize(dynType, msg, &jsonOutput) != 0){
- status = CELIX_BUNDLE_EXCEPTION;
- }
+ if (jsonSerializer_serialize(dynType, msg, &jsonOutput) != 0) {
+ status = CELIX_BUNDLE_EXCEPTION;
+ }
- if (status == CELIX_SUCCESS) {
- *out = jsonOutput;
- *outLen = strlen(jsonOutput) + 1;
- }
+ if (status == CELIX_SUCCESS) {
+ *out = jsonOutput;
+ *outLen = strlen(jsonOutput) + 1;
+ }
- return status;
+ return status;
}
celix_status_t pubsubMsgSerializer_deserialize(void* handle, const void* input, size_t inputLen, void **out) {
- celix_status_t status = CELIX_SUCCESS;
- pubsub_json_msg_serializer_impl_t *impl = handle;
- void *msg = NULL;
- dyn_type* dynType = NULL;
- dyn_message_type *dynMsg = impl->msgType;
- dynMessage_getMessageType(dynMsg, &dynType);
-
- if (jsonSerializer_deserialize(dynType, (const char*)input, &msg) != 0) {
- status = CELIX_BUNDLE_EXCEPTION;
- }
- else{
- *out = msg;
- }
-
- return status;
+ celix_status_t status = CELIX_SUCCESS;
+ pubsub_json_msg_serializer_impl_t *impl = handle;
+ void *msg = NULL;
+ dyn_type* dynType = NULL;
+ dyn_message_type *dynMsg = impl->msgType;
+ dynMessage_getMessageType(dynMsg, &dynType);
+
+ if (jsonSerializer_deserialize(dynType, (const char*)input, &msg) != 0) {
+ status = CELIX_BUNDLE_EXCEPTION;
+ }
+ else{
+ *out = msg;
+ }
+
+ return status;
}
void pubsubMsgSerializer_freeMsg(void* handle, void *msg) {
- pubsub_json_msg_serializer_impl_t *impl = handle;
- dyn_type* dynType = NULL;
- dyn_message_type *dynMsg = impl->msgType;
- dynMessage_getMessageType(dynMsg, &dynType);
- if (dynType != NULL) {
- dynType_free(dynType, msg);
- }
+ pubsub_json_msg_serializer_impl_t *impl = handle;
+ dyn_type* dynType = NULL;
+ dyn_message_type *dynMsg = impl->msgType;
+ dynMessage_getMessageType(dynMsg, &dynType);
+ if (dynType != NULL) {
+ dynType_free(dynType, msg);
+ }
}
static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgSerializers, bundle_pt bundle) {
- char* root = NULL;
- char* metaInfPath = NULL;
+ char* root = NULL;
+ char* metaInfPath = NULL;
- root = pubsubSerializer_getMsgDescriptionDir(bundle);
+ root = pubsubSerializer_getMsgDescriptionDir(bundle);
- if(root != NULL){
- asprintf(&metaInfPath, "%s/META-INF/descriptors", root);
+ if (root != NULL) {
+ asprintf(&metaInfPath, "%s/META-INF/descriptors", root);
- pubsubSerializer_addMsgSerializerFromBundle(root, bundle, msgSerializers);
- pubsubSerializer_addMsgSerializerFromBundle(metaInfPath, bundle, msgSerializers);
+ pubsubSerializer_addMsgSerializerFromBundle(root, bundle, msgSerializers);
+ pubsubSerializer_addMsgSerializerFromBundle(metaInfPath, bundle, msgSerializers);
- free(metaInfPath);
- free(root);
- }
+ free(metaInfPath);
+ free(root);
+ }
}
static char* pubsubSerializer_getMsgDescriptionDir(bundle_pt bundle)
{
- char *root = NULL;
+ char *root = NULL;
- bool isSystemBundle = false;
- bundle_isSystemBundle(bundle, &isSystemBundle);
+ bool isSystemBundle = false;
+ bundle_isSystemBundle(bundle, &isSystemBundle);
- if(isSystemBundle == true) {
- bundle_context_pt context;
- bundle_getContext(bundle, &context);
+ if (isSystemBundle == true) {
+ bundle_context_pt context;
+ bundle_getContext(bundle, &context);
- const char *prop = NULL;
+ const char *prop = NULL;
- bundleContext_getProperty(context, SYSTEM_BUNDLE_ARCHIVE_PATH, &prop);
+ bundleContext_getProperty(context, SYSTEM_BUNDLE_ARCHIVE_PATH, &prop);
- if(prop != NULL) {
- root = strdup(prop);
- } else {
- root = getcwd(NULL, 0);
- }
- } else {
- bundle_getEntry(bundle, ".", &root);
- }
+ if (prop != NULL) {
+ root = strdup(prop);
+ } else {
+ root = getcwd(NULL, 0);
+ }
+ } else {
+ bundle_getEntry(bundle, ".", &root);
+ }
- return root;
+ return root;
}
static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgSerializers)
{
- char path[MAX_PATH_LEN];
- struct dirent *entry = NULL;
- DIR *dir = opendir(root);
-
- if(dir) {
- entry = readdir(dir);
- }
-
- while (entry != NULL) {
-
- if (strstr(entry->d_name, ".descriptor") != NULL) {
-
- printf("DMU: Parsing entry '%s'\n", entry->d_name);
-
- snprintf(path, MAX_PATH_LEN, "%s/%s", root, entry->d_name);
- FILE *stream = fopen(path,"r");
-
- if (stream != NULL){
- dyn_message_type* msgType = NULL;
-
- int rc = dynMessage_parse(stream, &msgType);
- if (rc == 0 && msgType != NULL) {
-
- char* msgName = NULL;
- rc += dynMessage_getName(msgType,&msgName);
-
- version_pt msgVersion = NULL;
- rc += dynMessage_getVersion(msgType, &msgVersion);
-
- if(rc == 0 && msgName != NULL && msgVersion != NULL){
-
- unsigned int msgId = utils_stringHash(msgName);
-
- pubsub_msg_serializer_t *msgSerializer = calloc(1,sizeof(*msgSerializer));
- pubsub_json_msg_serializer_impl_t *impl = calloc(1, sizeof(*impl));
-
- impl->msgType = msgType;
- impl->msgId = msgId;
- impl->msgName = msgName;
- impl->msgVersion = msgVersion;
-
- msgSerializer->handle = impl;
- msgSerializer->msgId = impl->msgId;
- msgSerializer->msgName = impl->msgName;
- msgSerializer->msgVersion = impl->msgVersion;
- msgSerializer->serialize = (void*) pubsubMsgSerializer_serialize;
- msgSerializer->deserialize = (void*) pubsubMsgSerializer_deserialize;
- msgSerializer->freeMsg = (void*) pubsubMsgSerializer_freeMsg;
-
- bool clash = hashMap_containsKey(msgSerializers, (void*)(uintptr_t)msgId);
- if (clash){
- printf("Cannot add msg %s. clash in msg id %d!!\n", msgName, msgId);
- free(msgSerializer);
- free(impl);
- dynMessage_destroy(msgType);
- }
- else if (msgId != 0){
- printf("Adding %u : %s\n", msgId, msgName);
- hashMap_put(msgSerializers, (void*)(uintptr_t)msgId, msgSerializer);
- }
- else{
- printf("Error creating msg serializer\n");
- free(impl);
- free(msgSerializer);
- dynMessage_destroy(msgType);
- }
-
- }
- else{
- printf("Cannot retrieve name and/or version from msg\n");
- }
-
- } else{
- printf("DMU: cannot parse message from descriptor %s\n.",path);
- }
- fclose(stream);
- }else{
- printf("DMU: cannot open descriptor file %s\n.",path);
- }
-
- }
- entry = readdir(dir);
- }
-
- if(dir) {
- closedir(dir);
- }
+ char path[MAX_PATH_LEN];
+ struct dirent *entry = NULL;
+ DIR *dir = opendir(root);
+
+ if (dir) {
+ entry = readdir(dir);
+ }
+
+ while (entry != NULL) {
+
+ if (strstr(entry->d_name, ".descriptor") != NULL) {
+
+ printf("DMU: Parsing entry '%s'\n", entry->d_name);
+
+ snprintf(path, MAX_PATH_LEN, "%s/%s", root, entry->d_name);
+ FILE *stream = fopen(path,"r");
+
+ if (stream != NULL) {
+ dyn_message_type* msgType = NULL;
+
+ int rc = dynMessage_parse(stream, &msgType);
+ if (rc == 0 && msgType != NULL) {
+
+ char* msgName = NULL;
+ rc += dynMessage_getName(msgType,&msgName);
+
+ version_pt msgVersion = NULL;
+ rc += dynMessage_getVersion(msgType, &msgVersion);
+
+ if (rc == 0 && msgName != NULL && msgVersion != NULL) {
+
+ unsigned int msgId = utils_stringHash(msgName);
+
+ pubsub_msg_serializer_t *msgSerializer = calloc(1,sizeof(*msgSerializer));
+ pubsub_json_msg_serializer_impl_t *impl = calloc(1, sizeof(*impl));
+
+ impl->msgType = msgType;
+ impl->msgId = msgId;
+ impl->msgName = msgName;
+ impl->msgVersion = msgVersion;
+
+ msgSerializer->handle = impl;
+ msgSerializer->msgId = impl->msgId;
+ msgSerializer->msgName = impl->msgName;
+ msgSerializer->msgVersion = impl->msgVersion;
+ msgSerializer->serialize = (void*) pubsubMsgSerializer_serialize;
+ msgSerializer->deserialize = (void*) pubsubMsgSerializer_deserialize;
+ msgSerializer->freeMsg = (void*) pubsubMsgSerializer_freeMsg;
+
+ bool clash = hashMap_containsKey(msgSerializers, (void*)(uintptr_t)msgId);
+ if (clash) {
+ printf("Cannot add msg %s. clash in msg id %d!!\n", msgName, msgId);
+ free(msgSerializer);
+ free(impl);
+ dynMessage_destroy(msgType);
+ } else if (msgId != 0) {
+ printf("Adding %u : %s\n", msgId, msgName);
+ hashMap_put(msgSerializers, (void*)(uintptr_t)msgId, msgSerializer);
+ } else {
+ printf("Error creating msg serializer\n");
+ free(impl);
+ free(msgSerializer);
+ dynMessage_destroy(msgType);
+ }
+
+ } else {
+ printf("Cannot retrieve name and/or version from msg\n");
+ }
+
+ } else {
+ printf("DMU: cannot parse message from descriptor %s\n.",path);
+ }
+ fclose(stream);
+ } else {
+ printf("DMU: cannot open descriptor file %s\n.",path);
+ }
+
+ }
+ entry = readdir(dir);
+ }
+
+ if (dir) {
+ closedir(dir);
+ }
}
diff --git a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h
index 72ec18d..f44f6b2 100644
--- a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h
+++ b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h
@@ -27,7 +27,7 @@
#include "pubsub_serializer.h"
-#define PUBSUB_JSON_SERIALIZER_TYPE "json"
+#define PUBSUB_JSON_SERIALIZER_TYPE "json"
typedef struct pubsub_json_serializer pubsub_json_serializer_t;
diff --git a/bundles/pubsub/pubsub_spi/CMakeLists.txt b/bundles/pubsub/pubsub_spi/CMakeLists.txt
index ae4d89c..509c760 100644
--- a/bundles/pubsub/pubsub_spi/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_spi/CMakeLists.txt
@@ -18,22 +18,22 @@
find_package(UUID REQUIRED)
if(NOT UUID_LIBRARY)
- #i.e. not found for OSX
- set(UUID_LIBRARY "")
- set(UUID_INCLUDE_DIRS "")
+ #i.e. not found for OSX
+ set(UUID_LIBRARY "")
+ set(UUID_INCLUDE_DIRS "")
endif()
add_library(pubsub_spi STATIC
src/pubsub_utils_match.c
src/pubsub_endpoint.c
src/pubsub_utils.c
- src/pubsub_admin_metrics.c
+ src/pubsub_admin_metrics.c
)
target_include_directories(pubsub_spi SYSTEM PRIVATE ${UUID_INCLUDE_DIRS})
set_target_properties(pubsub_spi PROPERTIES OUTPUT_NAME "celix_pubsub_spi")
target_include_directories(pubsub_spi PUBLIC
- $<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}/include>
- $<INSTALL_INTERFACE:include/celix/pubsub_spi>
+ $<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}/include>
+ $<INSTALL_INTERFACE:include/celix/pubsub_spi>
)
target_link_libraries(pubsub_spi PUBLIC Celix::framework Celix::pubsub_api)
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
index 643322c..5735ef1 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
@@ -20,8 +20,8 @@
* pubsub_admin.h
*
* \date Sep 30, 2011
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
*/
#ifndef PUBSUB_ADMIN_H_
@@ -31,33 +31,33 @@
#include "celix_bundle.h"
#include "celix_filter.h"
-#define PUBSUB_ADMIN_SERVICE_NAME "pubsub_admin"
-#define PUBSUB_ADMIN_SERVICE_VERSION "3.0.0"
-#define PUBSUB_ADMIN_SERVICE_RANGE "[3,4)"
+#define PUBSUB_ADMIN_SERVICE_NAME "pubsub_admin"
+#define PUBSUB_ADMIN_SERVICE_VERSION "3.0.0"
+#define PUBSUB_ADMIN_SERVICE_RANGE "[3,4)"
//expected service properties
-#define PUBSUB_ADMIN_SERVICE_TYPE "psa_type"
+#define PUBSUB_ADMIN_SERVICE_TYPE "psa_type"
-#define PUBSUB_ADMIN_FULL_MATCH_SCORE 100.0F
-#define PUBSUB_ADMIN_NO_MATCH_SCORE 0.0F
+#define PUBSUB_ADMIN_FULL_MATCH_SCORE 100.0F
+#define PUBSUB_ADMIN_NO_MATCH_SCORE 0.0F
struct pubsub_admin_service {
- void *handle;
+ void *handle;
- celix_status_t (*matchPublisher)(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **outTopicProperties, double *outScopre, long *outSerializerSvcId);
- celix_status_t (*matchSubscriber)(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **outTopicProperties, double *outScore, long *outSerializerSvcId);
- celix_status_t (*matchDiscoveredEndpoint)(void *handle, const celix_properties_t *endpoint, bool *match);
+ celix_status_t (*matchPublisher)(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **outTopicProperties, double *outScopre, long *outSerializerSvcId);
+ celix_status_t (*matchSubscriber)(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **outTopicProperties, double *outScore, long *outSerializerSvcId);
+ celix_status_t (*matchDiscoveredEndpoint)(void *handle, const celix_properties_t *endpoint, bool *match);
- //note endpoint is owned by caller
- celix_status_t (*setupTopicSender)(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
- celix_status_t (*teardownTopicSender)(void *handle, const char *scope, const char *topic);
+ //note endpoint is owned by caller
+ celix_status_t (*setupTopicSender)(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
+ celix_status_t (*teardownTopicSender)(void *handle, const char *scope, const char *topic);
- //note endpoint is owned by caller
- celix_status_t (*setupTopicReceiver)(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
- celix_status_t (*teardownTopicReceiver)(void *handle, const char *scope, const char *topic);
+ //note endpoint is owned by caller
+ celix_status_t (*setupTopicReceiver)(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+ celix_status_t (*teardownTopicReceiver)(void *handle, const char *scope, const char *topic);
- celix_status_t (*addDiscoveredEndpoint)(void *handle, const celix_properties_t *endpoint);
- celix_status_t (*removeDiscoveredEndpoint)(void *handle, const celix_properties_t *endpoint);
+ celix_status_t (*addDiscoveredEndpoint)(void *handle, const celix_properties_t *endpoint);
+ celix_status_t (*removeDiscoveredEndpoint)(void *handle, const celix_properties_t *endpoint);
};
typedef struct pubsub_admin_service pubsub_admin_service_t;
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_admin_metrics.h b/bundles/pubsub/pubsub_spi/include/pubsub_admin_metrics.h
index 51be204..e6b1e24 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_admin_metrics.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_admin_metrics.h
@@ -24,59 +24,59 @@
#include <sys/time.h>
#include "celix_array_list.h"
-#define PUBSUB_ADMIN_METRICS_SERVICE_NAME "pubsub_admin_metrics [version 1.0]"
+#define PUBSUB_ADMIN_METRICS_SERVICE_NAME "pubsub_admin_metrics [version 1.0]"
-#define PUBSUB_AMDIN_METRICS_NAME_MAX 1024
+#define PUBSUB_AMDIN_METRICS_NAME_MAX 1024
typedef struct pubsub_admin_sender_msg_type_metrics {
- long bndId;
- char typeFqn[PUBSUB_AMDIN_METRICS_NAME_MAX];
- unsigned int typeId;
- unsigned long nrOfMessagesSend;
- unsigned long nrOfMessagesSendFailed;
- unsigned long nrOfSerializationErrors;
- struct timespec lastMessageSend;
- double averageTimeBetweenMessagesInSeconds;
- double averageSerializationTimeInSeconds;
+ long bndId;
+ char typeFqn[PUBSUB_AMDIN_METRICS_NAME_MAX];
+ unsigned int typeId;
+ unsigned long nrOfMessagesSend;
+ unsigned long nrOfMessagesSendFailed;
+ unsigned long nrOfSerializationErrors;
+ struct timespec lastMessageSend;
+ double averageTimeBetweenMessagesInSeconds;
+ double averageSerializationTimeInSeconds;
} pubsub_admin_sender_msg_type_metrics_t;
typedef struct pubsub_admin_sender_metrics {
- char scope[PUBSUB_AMDIN_METRICS_NAME_MAX];
- char topic[PUBSUB_AMDIN_METRICS_NAME_MAX];
- unsigned long nrOfUnknownMessagesRetrieved;
- unsigned int nrOfmsgMetrics;
- pubsub_admin_sender_msg_type_metrics_t *msgMetrics; //size = nrOfMessageTypes
+ char scope[PUBSUB_AMDIN_METRICS_NAME_MAX];
+ char topic[PUBSUB_AMDIN_METRICS_NAME_MAX];
+ unsigned long nrOfUnknownMessagesRetrieved;
+ unsigned int nrOfmsgMetrics;
+ pubsub_admin_sender_msg_type_metrics_t *msgMetrics; //size = nrOfMessageTypes
} pubsub_admin_sender_metrics_t;
typedef struct pubsub_admin_receiver_metrics {
- char scope[PUBSUB_AMDIN_METRICS_NAME_MAX];
- char topic[PUBSUB_AMDIN_METRICS_NAME_MAX];
- unsigned long nrOfMsgTypes;
- struct {
- unsigned int typeId;
- char typeFqn[PUBSUB_AMDIN_METRICS_NAME_MAX];
- int nrOfOrigins;
- struct {
- uuid_t originUUID;
- unsigned long nrOfMessagesReceived;
- unsigned long nrOfSerializationErrors;
- unsigned long nrOfMissingSeqNumbers;
- struct timespec lastMessageReceived;
- double averageTimeBetweenMessagesInSeconds;
- double averageSerializationTimeInSeconds;
- double averageDelayInSeconds;
- double minDelayInSeconds;
- double maxDelayInSeconds;
- } *origins;
- } *msgTypes;
+ char scope[PUBSUB_AMDIN_METRICS_NAME_MAX];
+ char topic[PUBSUB_AMDIN_METRICS_NAME_MAX];
+ unsigned long nrOfMsgTypes;
+ struct {
+ unsigned int typeId;
+ char typeFqn[PUBSUB_AMDIN_METRICS_NAME_MAX];
+ int nrOfOrigins;
+ struct {
+ uuid_t originUUID;
+ unsigned long nrOfMessagesReceived;
+ unsigned long nrOfSerializationErrors;
+ unsigned long nrOfMissingSeqNumbers;
+ struct timespec lastMessageReceived;
+ double averageTimeBetweenMessagesInSeconds;
+ double averageSerializationTimeInSeconds;
+ double averageDelayInSeconds;
+ double minDelayInSeconds;
+ double maxDelayInSeconds;
+ } *origins;
+ } *msgTypes;
} pubsub_admin_receiver_metrics_t;
typedef struct pubsub_admin_metrics {
- char psaType[PUBSUB_AMDIN_METRICS_NAME_MAX];
+ char psaType[PUBSUB_AMDIN_METRICS_NAME_MAX];
- celix_array_list_t *senders; //entry type = pubsub_admin_sender_metrics_t
- celix_array_list_t *receivers;//entry type = pubsub_admin_receiver_metrics_t
+ celix_array_list_t *senders; //entry type = pubsub_admin_sender_metrics_t
+ celix_array_list_t *receivers;//entry type = pubsub_admin_receiver_metrics_t
} pubsub_admin_metrics_t;
@@ -86,15 +86,15 @@ typedef struct pubsub_admin_metrics {
* Expected service properties: PUBSUB_ADMIN_SERVICE_TYPE
*/
struct pubsub_admin_metrics_service {
- void *handle;
-
- /**
- * Creates a metrics struct for the PSA. The caller is owner of the data and
- * should use pubsub_freePubSubAdminMetrics (part of pubsub_spi) to release the data.
- * @param handle
- * @return The metrics or NULL if no metrics can be created.
- */
- pubsub_admin_metrics_t* (*metrics)(void *handle);
+ void *handle;
+
+ /**
+ * Creates a metrics struct for the PSA. The caller is owner of the data and
+ * should use pubsub_freePubSubAdminMetrics (part of pubsub_spi) to release the data.
+ * @param handle
+ * @return The metrics or NULL if no metrics can be created.
+ */
+ pubsub_admin_metrics_t* (*metrics)(void *handle);
};
void pubsub_freePubSubAdminMetrics(pubsub_admin_metrics_t *metrics);
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
index 327acf9..5b40ca9 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
@@ -20,7 +20,7 @@
#ifndef PUBSUB_CONSTANTS_H_
#define PUBSUB_CONSTANTS_H_
-#define PUBSUB_ADMIN_TYPE_KEY "pubsub.config"
+#define PUBSUB_ADMIN_TYPE_KEY "pubsub.config"
#define PUBSUB_SERIALIZER_TYPE_KEY "pubsub.serializer"
/**
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h b/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h
index c0bbc24..52847ba 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h
@@ -27,10 +27,10 @@
//Informs the topology manager that pub/sub endpoints are discovered in the network
struct pubsub_discovered_endpoint_listener {
- void *handle;
+ void *handle;
- celix_status_t (*addDiscoveredEndpoint)(void *handle, const celix_properties_t *properties);
- celix_status_t (*removeDiscoveredEndpoint)(void *handle, const celix_properties_t *properties);
+ celix_status_t (*addDiscoveredEndpoint)(void *handle, const celix_properties_t *properties);
+ celix_status_t (*removeDiscoveredEndpoint)(void *handle, const celix_properties_t *properties);
};
typedef struct pubsub_discovered_endpoint_listener pubsub_discovered_endpoint_listener_t;
@@ -40,10 +40,10 @@ typedef struct pubsub_discovered_endpoint_listener pubsub_discovered_endpoint_li
//Informs the pubsub discoveries to announce/revoke endpoint
struct pubsub_announce_endpoint_listener {
- void *handle;
+ void *handle;
- celix_status_t (*announceEndpoint)(void *handle, const celix_properties_t *properties);
- celix_status_t (*revokeEndpoint)(void *handle, const celix_properties_t *properties);
+ celix_status_t (*announceEndpoint)(void *handle, const celix_properties_t *properties);
+ celix_status_t (*revokeEndpoint)(void *handle, const celix_properties_t *properties);
};
typedef struct pubsub_announce_endpoint_listener pubsub_announce_endpoint_listener_t;
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h b/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h
index 7aeb2e0..9d0846a 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h
@@ -33,28 +33,28 @@
* the extender pattern.
*/
-#define PUBSUB_SERIALIZER_SERVICE_NAME "pubsub_serializer"
-#define PUBSUB_SERIALIZER_SERVICE_VERSION "1.0.0"
-#define PUBSUB_SERIALIZER_SERVICE_RANGE "[1,2)"
+#define PUBSUB_SERIALIZER_SERVICE_NAME "pubsub_serializer"
+#define PUBSUB_SERIALIZER_SERVICE_VERSION "1.0.0"
+#define PUBSUB_SERIALIZER_SERVICE_RANGE "[1,2)"
typedef struct pubsub_msg_serializer {
- void* handle;
+ void* handle;
- unsigned int msgId;
- const char* msgName;
- version_pt msgVersion;
+ unsigned int msgId;
+ const char* msgName;
+ version_pt msgVersion;
- celix_status_t (*serialize)(void* handle, const void* input, void** out, size_t* outLen);
- celix_status_t (*deserialize)(void* handle, const void* input, size_t inputLen, void** out); //note inputLen can be 0 if predefined size is not needed
- void (*freeMsg)(void* handle, void* msg);
+ celix_status_t (*serialize)(void* handle, const void* input, void** out, size_t* outLen);
+ celix_status_t (*deserialize)(void* handle, const void* input, size_t inputLen, void** out); //note inputLen can be 0 if predefined size is not needed
+ void (*freeMsg)(void* handle, void* msg);
} pubsub_msg_serializer_t;
typedef struct pubsub_serializer_service {
- void* handle;
+ void* handle;
- celix_status_t (*createSerializerMap)(void* handle, celix_bundle_t *bundle, hash_map_pt* serializerMap);
- celix_status_t (*destroySerializerMap)(void* handle, hash_map_pt serializerMap);
+ celix_status_t (*createSerializerMap)(void* handle, celix_bundle_t *bundle, hash_map_pt* serializerMap);
+ celix_status_t (*destroySerializerMap)(void* handle, hash_map_pt serializerMap);
} pubsub_serializer_service_t;
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
index e7e5644..7c80745 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
@@ -26,9 +26,9 @@
#ifdef __cplusplus
extern "C" {
#endif
-#define PUBSUB_UTILS_QOS_ATTRIBUTE_KEY "qos"
+#define PUBSUB_UTILS_QOS_ATTRIBUTE_KEY "qos"
#define PUBSUB_UTILS_QOS_TYPE_SAMPLE "sample" /* A.k.a. unreliable connection */
-#define PUBSUB_UTILS_QOS_TYPE_CONTROL "control" /* A.k.a. reliable connection */
+#define PUBSUB_UTILS_QOS_TYPE_CONTROL "control" /* A.k.a. reliable connection */
/**
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
index 59f64c8..6805b79 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
@@ -46,42 +46,42 @@ static void pubsubEndpoint_setFields(celix_properties_t *ep, const char* fwUUID,
//copy topic properties
if (topic_props != NULL) {
- const char *key = NULL;
- CELIX_PROPERTIES_FOR_EACH((celix_properties_t *) topic_props, key) {
- celix_properties_set(ep, key, celix_properties_get(topic_props, key, NULL));
- }
- }
-
-
- char endpointUuid[37];
- uuid_t endpointUid;
- uuid_generate(endpointUid);
- uuid_unparse(endpointUid, endpointUuid);
- celix_properties_set(ep, PUBSUB_ENDPOINT_UUID, endpointUuid);
-
- if (fwUUID != NULL) {
- celix_properties_set(ep, PUBSUB_ENDPOINT_FRAMEWORK_UUID, fwUUID);
- }
-
- if (scope != NULL) {
- celix_properties_set(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, scope);
- }
-
- if (topic != NULL) {
- celix_properties_set(ep, PUBSUB_ENDPOINT_TOPIC_NAME, topic);
- }
-
- if (pubsubType != NULL) {
- celix_properties_set(ep, PUBSUB_ENDPOINT_TYPE, pubsubType);
- }
-
- if (adminType != NULL) {
- celix_properties_set(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, adminType);
- }
-
- if (serType != NULL) {
- celix_properties_set(ep, PUBSUB_ENDPOINT_SERIALIZER, serType);
- }
+ const char *key = NULL;
+ CELIX_PROPERTIES_FOR_EACH((celix_properties_t *) topic_props, key) {
+ celix_properties_set(ep, key, celix_properties_get(topic_props, key, NULL));
+ }
+ }
+
+
+ char endpointUuid[37];
+ uuid_t endpointUid;
+ uuid_generate(endpointUid);
+ uuid_unparse(endpointUid, endpointUuid);
+ celix_properties_set(ep, PUBSUB_ENDPOINT_UUID, endpointUuid);
+
+ if (fwUUID != NULL) {
+ celix_properties_set(ep, PUBSUB_ENDPOINT_FRAMEWORK_UUID, fwUUID);
+ }
+
+ if (scope != NULL) {
+ celix_properties_set(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, scope);
+ }
+
+ if (topic != NULL) {
+ celix_properties_set(ep, PUBSUB_ENDPOINT_TOPIC_NAME, topic);
+ }
+
+ if (pubsubType != NULL) {
+ celix_properties_set(ep, PUBSUB_ENDPOINT_TYPE, pubsubType);
+ }
+
+ if (adminType != NULL) {
+ celix_properties_set(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, adminType);
+ }
+
+ if (serType != NULL) {
+ celix_properties_set(ep, PUBSUB_ENDPOINT_SERIALIZER, serType);
+ }
}
celix_properties_t* pubsubEndpoint_create(
@@ -92,67 +92,67 @@ celix_properties_t* pubsubEndpoint_create(
const char* adminType,
const char *serType,
celix_properties_t *topic_props) {
- celix_properties_t *ep = celix_properties_create();
- pubsubEndpoint_setFields(ep, fwUUID, scope, topic, pubsubType, adminType, serType, topic_props);
- if (!pubsubEndpoint_isValid(ep, true, true)) {
- celix_properties_destroy(ep);
- ep = NULL;
+ celix_properties_t *ep = celix_properties_create();
+ pubsubEndpoint_setFields(ep, fwUUID, scope, topic, pubsubType, adminType, serType, topic_props);
+ if (!pubsubEndpoint_isValid(ep, true, true)) {
+ celix_properties_destroy(ep);
+ ep = NULL;
}
- return ep;
+ return ep;
}
struct retrieve_topic_properties_data {
- celix_properties_t *props;
- const char *topic;
- bool isPublisher;
+ celix_properties_t *props;
+ const char *topic;
+ bool isPublisher;
};
static void retrieveTopicProperties(void *handle, const celix_bundle_t *bnd) {
- struct retrieve_topic_properties_data *data = handle;
- data->props = pubsub_utils_getTopicProperties(bnd, data->topic, data->isPublisher);
+ struct retrieve_topic_properties_data *data = handle;
+ data->props = pubsub_utils_getTopicProperties(bnd, data->topic, data->isPublisher);
}
celix_properties_t* pubsubEndpoint_createFromSubscriberSvc(bundle_context_t* ctx, long bundleId, const celix_properties_t *svcProps) {
celix_properties_t *ep = celix_properties_create();
- const char* fwUUID = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
- const char* scope = celix_properties_get(svcProps, PUBSUB_SUBSCRIBER_SCOPE, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT);
- const char* topic = celix_properties_get(svcProps, PUBSUB_SUBSCRIBER_TOPIC, NULL);
+ const char* fwUUID = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
+ const char* scope = celix_properties_get(svcProps, PUBSUB_SUBSCRIBER_SCOPE, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT);
+ const char* topic = celix_properties_get(svcProps, PUBSUB_SUBSCRIBER_TOPIC, NULL);
- struct retrieve_topic_properties_data data;
- data.props = NULL;
- data.isPublisher = false;
- data.topic = topic;
- celix_bundleContext_useBundle(ctx, bundleId, &data, retrieveTopicProperties);
+ struct retrieve_topic_properties_data data;
+ data.props = NULL;
+ data.isPublisher = false;
+ data.topic = topic;
+ celix_bundleContext_useBundle(ctx, bundleId, &data, retrieveTopicProperties);
- const char *pubsubType = PUBSUB_SUBSCRIBER_ENDPOINT_TYPE;
+ const char *pubsubType = PUBSUB_SUBSCRIBER_ENDPOINT_TYPE;
- pubsubEndpoint_setFields(ep, fwUUID, scope, topic, pubsubType, NULL, NULL, data.props);
+ pubsubEndpoint_setFields(ep, fwUUID, scope, topic, pubsubType, NULL, NULL, data.props);
- if(data.props != NULL){
- celix_properties_destroy(data.props); //Can be deleted since setFields invokes properties_copy
- }
+ if (data.props != NULL) {
+ celix_properties_destroy(data.props); //Can be deleted since setFields invokes properties_copy
+ }
- if (!pubsubEndpoint_isValid(ep, false, false)) {
- celix_properties_destroy(ep);
- ep = NULL;
- }
- return ep;
+ if (!pubsubEndpoint_isValid(ep, false, false)) {
+ celix_properties_destroy(ep);
+ ep = NULL;
+ }
+ return ep;
}
celix_properties_t* pubsubEndpoint_createFromPublisherTrackerInfo(bundle_context_t *ctx, long bundleId, const char *filter) {
celix_properties_t *ep = celix_properties_create();
- const char* fwUUID=NULL;
- bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
+ const char* fwUUID=NULL;
+ bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
assert(fwUUID != NULL);
- char* topic = NULL;
- char* scopeFromFilter = NULL;
- pubsub_getPubSubInfoFromFilter(filter, &topic, &scopeFromFilter);
- const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter;
+ char* topic = NULL;
+ char* scopeFromFilter = NULL;
+ pubsub_getPubSubInfoFromFilter(filter, &topic, &scopeFromFilter);
+ const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter;
struct retrieve_topic_properties_data data;
data.props = NULL;
@@ -161,61 +161,61 @@ celix_properties_t* pubsubEndpoint_createFromPublisherTrackerInfo(bundle_context
celix_bundleContext_useBundle(ctx, bundleId, &data, retrieveTopicProperties);
if (data.props != NULL) {
- pubsubEndpoint_setFields(ep, fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, NULL, NULL, data.props);
- celix_properties_destroy(data.props); //safe to delete, properties are copied in pubsubEndpoint_setFields
+ pubsubEndpoint_setFields(ep, fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, NULL, NULL, data.props);
+ celix_properties_destroy(data.props); //safe to delete, properties are copied in pubsubEndpoint_setFields
}
- if (!pubsubEndpoint_isValid(ep, false, false)) {
- celix_properties_destroy(ep);
- ep = NULL;
- }
+ if (!pubsubEndpoint_isValid(ep, false, false)) {
+ celix_properties_destroy(ep);
+ ep = NULL;
+ }
- free(topic);
- free(scopeFromFilter);
+ free(topic);
+ free(scopeFromFilter);
- return ep;
+ return ep;
}
bool pubsubEndpoint_equals(const celix_properties_t *psEp1, const celix_properties_t *psEp2) {
- if (psEp1 && psEp2) {
- const char *uuid1 = celix_properties_get(psEp1, PUBSUB_ENDPOINT_UUID, "entry1");
+ if (psEp1 && psEp2) {
+ const char *uuid1 = celix_properties_get(psEp1, PUBSUB_ENDPOINT_UUID, "entry1");
const char *uuid2 = celix_properties_get(psEp1, PUBSUB_ENDPOINT_UUID, "entry1");
return strcmp(uuid1, uuid2) == 0;
- } else {
- return false;
- }
+ } else {
+ return false;
+ }
}
char* pubsubEndpoint_createScopeTopicKey(const char* scope, const char* topic) {
- char *result = NULL;
- asprintf(&result, "%s:%s", scope, topic);
- return result;
+ char *result = NULL;
+ asprintf(&result, "%s:%s", scope, topic);
+ return result;
}
static bool checkProp(const celix_properties_t *props, const char *key) {
- const char *val = celix_properties_get(props, key, NULL);
- if (val == NULL) {
- fprintf(stderr, "[Error] Missing mandatory entry for endpoint. Missing key is '%s'\n", key);
- }
- return val != NULL;
+ const char *val = celix_properties_get(props, key, NULL);
+ if (val == NULL) {
+ fprintf(stderr, "[Error] Missing mandatory entry for endpoint. Missing key is '%s'\n", key);
+ }
+ return val != NULL;
}
bool pubsubEndpoint_isValid(const celix_properties_t *props, bool requireAdminType, bool requireSerializerType) {
- bool p1 = checkProp(props, PUBSUB_ENDPOINT_UUID);
- bool p2 = checkProp(props, PUBSUB_ENDPOINT_FRAMEWORK_UUID);
- bool p3 = checkProp(props, PUBSUB_ENDPOINT_TYPE);
- bool p4 = true;
- if (requireAdminType) {
- checkProp(props, PUBSUB_ENDPOINT_ADMIN_TYPE);
- }
- bool p5 = true;
- if (requireSerializerType) {
- checkProp(props, PUBSUB_ENDPOINT_SERIALIZER);
- }
- bool p6 = checkProp(props, PUBSUB_ENDPOINT_TOPIC_NAME);
- bool p7 = checkProp(props, PUBSUB_ENDPOINT_TOPIC_SCOPE);
-
- return p1 && p2 && p3 && p4 && p5 && p6 && p7;
+ bool p1 = checkProp(props, PUBSUB_ENDPOINT_UUID);
+ bool p2 = checkProp(props, PUBSUB_ENDPOINT_FRAMEWORK_UUID);
+ bool p3 = checkProp(props, PUBSUB_ENDPOINT_TYPE);
+ bool p4 = true;
+ if (requireAdminType) {
+ checkProp(props, PUBSUB_ENDPOINT_ADMIN_TYPE);
+ }
+ bool p5 = true;
+ if (requireSerializerType) {
+ checkProp(props, PUBSUB_ENDPOINT_SERIALIZER);
+ }
+ bool p6 = checkProp(props, PUBSUB_ENDPOINT_TOPIC_NAME);
+ bool p7 = checkProp(props, PUBSUB_ENDPOINT_TOPIC_SCOPE);
+
+ return p1 && p2 && p3 && p4 && p5 && p6 && p7;
}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_utils.c b/bundles/pubsub/pubsub_spi/src/pubsub_utils.c
index d1327ec..b6ed882 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_utils.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_utils.c
@@ -38,48 +38,48 @@
celix_status_t pubsub_getPubSubInfoFromFilter(const char* filterstr, char **topicOut, char **scopeOut) {
- celix_status_t status = CELIX_SUCCESS;
- const char *topic = NULL;
- const char *scope = NULL;
- const char *objectClass = NULL;
- celix_filter_t *filter = filter_create(filterstr);
- if (filter != NULL) {
- if (filter->operand == CELIX_FILTER_OPERAND_AND) { //only and pubsub filter valid (e.g. (&(objectClass=pubsub_publisher)(topic=example))
- array_list_t *attributes = filter->children;
- unsigned int i;
- unsigned int size = arrayList_size(attributes);
- for (i = 0; i < size; ++i) {
- filter_t *attr = arrayList_get(attributes, i);
- if (attr->operand == CELIX_FILTER_OPERAND_EQUAL) {
- if (strncmp(OSGI_FRAMEWORK_OBJECTCLASS, attr->attribute, 128) == 0) {
- objectClass = attr->value;
- } else if (strncmp(PUBSUB_PUBLISHER_TOPIC, attr->attribute, 128) == 0) {
- topic = attr->value;
- } else if (strncmp(PUBSUB_PUBLISHER_SCOPE, attr->attribute, 128) == 0) {
- scope = attr->value;
- }
- }
- }
- }
- }
-
- if (topic != NULL && objectClass != NULL && strncmp(objectClass, PUBSUB_PUBLISHER_SERVICE_NAME, 128) == 0) {
- //NOTE topic must be present, scope can be present in the filter.
- *topicOut = strdup(topic);
+ celix_status_t status = CELIX_SUCCESS;
+ const char *topic = NULL;
+ const char *scope = NULL;
+ const char *objectClass = NULL;
+ celix_filter_t *filter = celix_filter_create(filterstr);
+ if (filter != NULL) {
+ if (filter->operand == CELIX_FILTER_OPERAND_AND) { //only and pubsub filter valid (e.g. (&(objectClass=pubsub_publisher)(topic=example))
+ array_list_t *attributes = filter->children;
+ unsigned int i;
+ unsigned int size = arrayList_size(attributes);
+ for (i = 0; i < size; ++i) {
+ filter_t *attr = arrayList_get(attributes, i);
+ if (attr->operand == CELIX_FILTER_OPERAND_EQUAL) {
+ if (strncmp(OSGI_FRAMEWORK_OBJECTCLASS, attr->attribute, 128) == 0) {
+ objectClass = attr->value;
+ } else if (strncmp(PUBSUB_PUBLISHER_TOPIC, attr->attribute, 128) == 0) {
+ topic = attr->value;
+ } else if (strncmp(PUBSUB_PUBLISHER_SCOPE, attr->attribute, 128) == 0) {
+ scope = attr->value;
+ }
+ }
+ }
+ }
+ }
+
+ if (topic != NULL && objectClass != NULL && strncmp(objectClass, PUBSUB_PUBLISHER_SERVICE_NAME, 128) == 0) {
+ //NOTE topic must be present, scope can be present in the filter.
+ *topicOut = strdup(topic);
if (scope != NULL) {
- *scopeOut = strdup(scope);
- } else {
- *scopeOut = NULL;
- }
- } else {
- *topicOut = NULL;
- *scopeOut = NULL;
- }
-
- if (filter != NULL) {
+ *scopeOut = strdup(scope);
+ } else {
+ *scopeOut = NULL;
+ }
+ } else {
+ *topicOut = NULL;
+ *scopeOut = NULL;
+ }
+
+ if (filter != NULL) {
filter_destroy(filter);
}
- return status;
+ return status;
}
@@ -91,72 +91,72 @@ celix_status_t pubsub_getPubSubInfoFromFilter(const char* filterstr, char **topi
*/
char* pubsub_getKeysBundleDir(bundle_context_pt ctx)
{
- array_list_pt bundles = NULL;
- bundleContext_getBundles(ctx, &bundles);
- int nrOfBundles = arrayList_size(bundles);
- long bundle_id = -1;
- char* result = NULL;
-
- for (int i = 0; i < nrOfBundles; i++){
- bundle_pt b = arrayList_get(bundles, i);
-
- /* Skip bundle 0 (framework bundle) since it has no path nor revisions */
- bundle_getBundleId(b, &bundle_id);
- if(bundle_id==0){
- continue;
- }
-
- char* dir = NULL;
- bundle_getEntry(b, ".", &dir);
-
- char cert_dir[MAX_KEYBUNDLE_LENGTH];
- snprintf(cert_dir, MAX_KEYBUNDLE_LENGTH, "%s/META-INF/keys", dir);
-
- struct stat s;
- int err = stat(cert_dir, &s);
- if (err != -1){
- if (S_ISDIR(s.st_mode)){
- result = dir;
- break;
- }
- }
-
- free(dir);
- }
-
- arrayList_destroy(bundles);
-
- return result;
+ array_list_pt bundles = NULL;
+ bundleContext_getBundles(ctx, &bundles);
+ int nrOfBundles = arrayList_size(bundles);
+ long bundle_id = -1;
+ char* result = NULL;
+
+ for (int i = 0; i < nrOfBundles; i++) {
+ bundle_pt b = arrayList_get(bundles, i);
+
+ /* Skip bundle 0 (framework bundle) since it has no path nor revisions */
+ bundle_getBundleId(b, &bundle_id);
+ if (bundle_id == 0) {
+ continue;
+ }
+
+ char* dir = NULL;
+ bundle_getEntry(b, ".", &dir);
+
+ char cert_dir[MAX_KEYBUNDLE_LENGTH];
+ snprintf(cert_dir, MAX_KEYBUNDLE_LENGTH, "%s/META-INF/keys", dir);
+
+ struct stat s;
+ int err = stat(cert_dir, &s);
+ if (err != -1) {
+ if (S_ISDIR(s.st_mode)) {
+ result = dir;
+ break;
+ }
+ }
+
+ free(dir);
+ }
+
+ arrayList_destroy(bundles);
+
+ return result;
}
celix_properties_t *pubsub_utils_getTopicProperties(const celix_bundle_t *bundle, const char *topic, bool isPublisher) {
- celix_properties_t *topic_props = NULL;
+ celix_properties_t *topic_props = NULL;
- bool isSystemBundle = false;
- bundle_isSystemBundle((bundle_pt)bundle, &isSystemBundle);
- long bundleId = -1;
- bundle_isSystemBundle((bundle_pt)bundle, &isSystemBundle);
- bundle_getBundleId((bundle_pt)bundle,&bundleId);
+ bool isSystemBundle = false;
+ bundle_isSystemBundle((bundle_pt)bundle, &isSystemBundle);
+ long bundleId = -1;
+ bundle_isSystemBundle((bundle_pt)bundle, &isSystemBundle);
+ bundle_getBundleId((bundle_pt)bundle,&bundleId);
- if(isSystemBundle == false) {
+ if (isSystemBundle == false) {
- char *bundleRoot = NULL;
- char* topicPropertiesPath = NULL;
- bundle_getEntry((bundle_pt)bundle, ".", &bundleRoot);
+ char *bundleRoot = NULL;
+ char* topicPropertiesPath = NULL;
+ bundle_getEntry((bundle_pt)bundle, ".", &bundleRoot);
- if(bundleRoot != NULL){
+ if (bundleRoot != NULL) {
- asprintf(&topicPropertiesPath, "%s/META-INF/topics/%s/%s.properties", bundleRoot, isPublisher? "pub":"sub", topic);
- topic_props = properties_load(topicPropertiesPath);
- if(topic_props==NULL){
- printf("PubSub: Could not load properties for %s on topic %s. Searched location %s, bundleId=%ld\n", isPublisher? "publication":"subscription", topic, topicPropertiesPath, bundleId);
- }
+ asprintf(&topicPropertiesPath, "%s/META-INF/topics/%s/%s.properties", bundleRoot, isPublisher? "pub":"sub", topic);
+ topic_props = celix_properties_load(topicPropertiesPath);
+ if (topic_props == NULL) {
+ printf("PubSub: Could not load properties for %s on topic %s. Searched location %s, bundleId=%ld\n", isPublisher? "publication":"subscription", topic, topicPropertiesPath, bundleId);
+ }
- free(topicPropertiesPath);
- free(bundleRoot);
- }
- }
+ free(topicPropertiesPath);
+ free(bundleRoot);
+ }
+ }
- return topic_props;
+ return topic_props;
}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c b/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
index a10b45d..3d8a0bd 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
@@ -31,202 +31,202 @@
#include "constants.h"
static double getPSAScore(const char *requested_admin, const char *request_qos, const char *adminType, double sampleScore, double controlScore, double defaultScore) {
- double score;
- if (requested_admin != NULL && strncmp(requested_admin, adminType, strlen(adminType)) == 0) {
- /* We got precise specification on the pubsub_admin we want */
- //Full match
- score = PUBSUB_ADMIN_FULL_MATCH_SCORE;
- } else if (requested_admin != NULL) {
- //admin type requested, but no match -> do not select this psa
- score = PUBSUB_ADMIN_NO_MATCH_SCORE;
- } else if (request_qos != NULL && strncmp(request_qos, PUBSUB_UTILS_QOS_TYPE_SAMPLE, strlen(PUBSUB_UTILS_QOS_TYPE_SAMPLE)) == 0) {
- //qos match
- score = sampleScore;
- } else if (request_qos != NULL && strncmp(request_qos, PUBSUB_UTILS_QOS_TYPE_CONTROL, strlen(PUBSUB_UTILS_QOS_TYPE_CONTROL)) == 0) {
- //qos match
- score = controlScore;
- } else if (request_qos != NULL) {
- //note unsupported qos -> defaultScore
- score = defaultScore;
- } else {
- //default match
- score = defaultScore;
- }
- return score;
+ double score;
+ if (requested_admin != NULL && strncmp(requested_admin, adminType, strlen(adminType)) == 0) {
+ /* We got precise specification on the pubsub_admin we want */
+ //Full match
+ score = PUBSUB_ADMIN_FULL_MATCH_SCORE;
+ } else if (requested_admin != NULL) {
+ //admin type requested, but no match -> do not select this psa
+ score = PUBSUB_ADMIN_NO_MATCH_SCORE;
+ } else if (request_qos != NULL && strncmp(request_qos, PUBSUB_UTILS_QOS_TYPE_SAMPLE, strlen(PUBSUB_UTILS_QOS_TYPE_SAMPLE)) == 0) {
+ //qos match
+ score = sampleScore;
+ } else if (request_qos != NULL && strncmp(request_qos, PUBSUB_UTILS_QOS_TYPE_CONTROL, strlen(PUBSUB_UTILS_QOS_TYPE_CONTROL)) == 0) {
+ //qos match
+ score = controlScore;
+ } else if (request_qos != NULL) {
+ //note unsupported qos -> defaultScore
+ score = defaultScore;
+ } else {
+ //default match
+ score = defaultScore;
+ }
+ return score;
}
struct psa_serializer_selection_data {
- const char *requested_serializer;
- long matchingSvcId;
+ const char *requested_serializer;
+ long matchingSvcId;
};
void psa_serializer_selection_callback(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props) {
- struct psa_serializer_selection_data *data = handle;
- const char *serType = celix_properties_get(props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
- if (serType == NULL) {
- fprintf(stderr, "Warning found serializer without mandatory serializer type key (%s)\n", PUBSUB_SERIALIZER_TYPE_KEY);
- } else {
- if (strncmp(data->requested_serializer, serType, 1024 * 1024) == 0) {
- data->matchingSvcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
- }
- }
+ struct psa_serializer_selection_data *data = handle;
+ const char *serType = celix_properties_get(props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
+ if (serType == NULL) {
+ fprintf(stderr, "Warning found serializer without mandatory serializer type key (%s)\n", PUBSUB_SERIALIZER_TYPE_KEY);
+ } else {
+ if (strncmp(data->requested_serializer, serType, 1024 * 1024) == 0) {
+ data->matchingSvcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+ }
+ }
}
static long getPSASerializer(celix_bundle_context_t *ctx, const char *requested_serializer) {
- long svcId;
-
- if (requested_serializer != NULL) {
- struct psa_serializer_selection_data data;
- data.requested_serializer = requested_serializer;
- data.matchingSvcId = -1L;
-
- celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
- opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
- opts.filter.ignoreServiceLanguage = true;
- opts.callbackHandle = &data;
- opts.useWithProperties = psa_serializer_selection_callback;
- celix_bundleContext_useServicesWithOptions(ctx, &opts);
- svcId = data.matchingSvcId;
- } else {
- celix_service_filter_options_t opts = CELIX_EMPTY_SERVICE_FILTER_OPTIONS;
- opts.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
- opts.ignoreServiceLanguage = true;
-
- //note findService will automatically return the highest ranking service id
- svcId = celix_bundleContext_findServiceWithOptions(ctx, &opts);
- }
-
- return svcId;
+ long svcId;
+
+ if (requested_serializer != NULL) {
+ struct psa_serializer_selection_data data;
+ data.requested_serializer = requested_serializer;
+ data.matchingSvcId = -1L;
+
+ celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
+ opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
+ opts.filter.ignoreServiceLanguage = true;
+ opts.callbackHandle = &data;
+ opts.useWithProperties = psa_serializer_selection_callback;
+ celix_bundleContext_useServicesWithOptions(ctx, &opts);
+ svcId = data.matchingSvcId;
+ } else {
+ celix_service_filter_options_t opts = CELIX_EMPTY_SERVICE_FILTER_OPTIONS;
+ opts.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
+ opts.ignoreServiceLanguage = true;
+
+ //note findService will automatically return the highest ranking service id
+ svcId = celix_bundleContext_findServiceWithOptions(ctx, &opts);
+ }
+
+ return svcId;
}
double pubsub_utils_matchPublisher(
- celix_bundle_context_t *ctx,
- long bundleId,
- const char *filter,
- const char *adminType,
- double sampleScore,
- double controlScore,
- double defaultScore,
- celix_properties_t **outTopicProperties,
- long *outSerializerSvcId) {
-
- celix_properties_t *ep = pubsubEndpoint_createFromPublisherTrackerInfo(ctx, bundleId, filter);
- const char *requested_admin = NULL;
- const char *requested_qos = NULL;
- if (ep != NULL) {
- requested_admin = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
- requested_qos = celix_properties_get(ep, PUBSUB_UTILS_QOS_ATTRIBUTE_KEY, NULL);
- }
-
- double score = getPSAScore(requested_admin, requested_qos, adminType, sampleScore, controlScore, defaultScore);
-
- const char *requested_serializer = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, NULL);
- long serializerSvcId = getPSASerializer(ctx, requested_serializer);
-
- if (serializerSvcId < 0) {
- score = PUBSUB_ADMIN_NO_MATCH_SCORE; //no serializer, no match
- }
-
-// printf("Score publisher service for psa type %s is %f\n", adminType, score);
-
- if (outSerializerSvcId != NULL) {
- *outSerializerSvcId = serializerSvcId;
- }
-
- if (outTopicProperties != NULL) {
- *outTopicProperties = ep;
- } else if (ep != NULL) {
- celix_properties_destroy(ep);
- }
-
- return score;
+ celix_bundle_context_t *ctx,
+ long bundleId,
+ const char *filter,
+ const char *adminType,
+ double sampleScore,
+ double controlScore,
+ double defaultScore,
+ celix_properties_t **outTopicProperties,
+ long *outSerializerSvcId) {
+
+ celix_properties_t *ep = pubsubEndpoint_createFromPublisherTrackerInfo(ctx, bundleId, filter);
+ const char *requested_admin = NULL;
+ const char *requested_qos = NULL;
+ if (ep != NULL) {
+ requested_admin = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
+ requested_qos = celix_properties_get(ep, PUBSUB_UTILS_QOS_ATTRIBUTE_KEY, NULL);
+ }
+
+ double score = getPSAScore(requested_admin, requested_qos, adminType, sampleScore, controlScore, defaultScore);
+
+ const char *requested_serializer = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, NULL);
+ long serializerSvcId = getPSASerializer(ctx, requested_serializer);
+
+ if (serializerSvcId < 0) {
+ score = PUBSUB_ADMIN_NO_MATCH_SCORE; //no serializer, no match
+ }
+
+// printf("Score publisher service for psa type %s is %f\n", adminType, score);
+
+ if (outSerializerSvcId != NULL) {
+ *outSerializerSvcId = serializerSvcId;
+ }
+
+ if (outTopicProperties != NULL) {
+ *outTopicProperties = ep;
+ } else if (ep != NULL) {
+ celix_properties_destroy(ep);
+ }
+
+ return score;
}
typedef struct pubsub_match_retrieve_topic_properties_data {
- const char *topic;
- bool isPublisher;
+ const char *topic;
+ bool isPublisher;
- celix_properties_t *outEndpoint;
+ celix_properties_t *outEndpoint;
} pubsub_get_topic_properties_data_t;
static void getTopicPropertiesCallback(void *handle, const celix_bundle_t *bnd) {
- pubsub_get_topic_properties_data_t *data = handle;
- data->outEndpoint = pubsub_utils_getTopicProperties(bnd, data->topic, data->isPublisher);
+ pubsub_get_topic_properties_data_t *data = handle;
+ data->outEndpoint = pubsub_utils_getTopicProperties(bnd, data->topic, data->isPublisher);
}
double pubsub_utils_matchSubscriber(
- celix_bundle_context_t *ctx,
- const long svcProviderBundleId,
- const celix_properties_t *svcProperties,
- const char *adminType,
- double sampleScore,
- double controlScore,
- double defaultScore,
- celix_properties_t **outTopicProperties,
- long *outSerializerSvcId) {
-
- pubsub_get_topic_properties_data_t data;
- data.isPublisher = false;
- data.topic = celix_properties_get(svcProperties, PUBSUB_SUBSCRIBER_TOPIC, NULL);
- data.outEndpoint = NULL;
- celix_bundleContext_useBundle(ctx, svcProviderBundleId, &data, getTopicPropertiesCallback);
-
- celix_properties_t *ep = data.outEndpoint;
- const char *requested_admin = NULL;
- const char *requested_qos = NULL;
- const char *requested_serializer = NULL;
- if (ep != NULL) {
- requested_admin = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
- requested_qos = celix_properties_get(ep, PUBSUB_UTILS_QOS_ATTRIBUTE_KEY, NULL);
- requested_serializer = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, NULL);
- }
-
- double score = getPSAScore(requested_admin, requested_qos, adminType, sampleScore, controlScore, defaultScore);
-
- long serializerSvcId = getPSASerializer(ctx, requested_serializer);
- if (serializerSvcId < 0) {
- score = PUBSUB_ADMIN_NO_MATCH_SCORE; //no serializer, no match
- }
-
- if (outSerializerSvcId != NULL) {
- *outSerializerSvcId = serializerSvcId;
- }
-
- if (outTopicProperties != NULL) {
- *outTopicProperties = ep;
- } else if (ep != NULL) {
- celix_properties_destroy(ep);
- }
-
- return score;
+ celix_bundle_context_t *ctx,
+ const long svcProviderBundleId,
+ const celix_properties_t *svcProperties,
+ const char *adminType,
+ double sampleScore,
+ double controlScore,
+ double defaultScore,
+ celix_properties_t **outTopicProperties,
+ long *outSerializerSvcId) {
+
+ pubsub_get_topic_properties_data_t data;
+ data.isPublisher = false;
+ data.topic = celix_properties_get(svcProperties, PUBSUB_SUBSCRIBER_TOPIC, NULL);
+ data.outEndpoint = NULL;
+ celix_bundleContext_useBundle(ctx, svcProviderBundleId, &data, getTopicPropertiesCallback);
+
+ celix_properties_t *ep = data.outEndpoint;
+ const char *requested_admin = NULL;
+ const char *requested_qos = NULL;
+ const char *requested_serializer = NULL;
+ if (ep != NULL) {
+ requested_admin = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
+ requested_qos = celix_properties_get(ep, PUBSUB_UTILS_QOS_ATTRIBUTE_KEY, NULL);
+ requested_serializer = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, NULL);
+ }
+
+ double score = getPSAScore(requested_admin, requested_qos, adminType, sampleScore, controlScore, defaultScore);
+
+ long serializerSvcId = getPSASerializer(ctx, requested_serializer);
+ if (serializerSvcId < 0) {
+ score = PUBSUB_ADMIN_NO_MATCH_SCORE; //no serializer, no match
+ }
+
+ if (outSerializerSvcId != NULL) {
+ *outSerializerSvcId = serializerSvcId;
+ }
+
+ if (outTopicProperties != NULL) {
+ *outTopicProperties = ep;
+ } else if (ep != NULL) {
+ celix_properties_destroy(ep);
+ }
+
+ return score;
}
bool pubsub_utils_matchEndpoint(
- celix_bundle_context_t *ctx,
- const celix_properties_t *ep,
- const char *adminType,
- long *outSerializerSvcId) {
-
- bool psaMatch = false;
- const char *configured_admin = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
- if (configured_admin != NULL) {
- psaMatch = strncmp(configured_admin, adminType, strlen(adminType)) == 0;
- }
-
- bool serMatch = false;
- long serializerSvcId = -1L;
- if (psaMatch) {
- const char *configured_serializer = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, NULL);
- serializerSvcId = getPSASerializer(ctx, configured_serializer);
- serMatch = serializerSvcId >= 0;
- }
-
- bool match = psaMatch && serMatch;
-// printf("Match for endpoint for psa type %s is %s\n", adminType, match ? "true" : "false");
-
- if (outSerializerSvcId != NULL) {
- *outSerializerSvcId = serializerSvcId;
- }
-
- return match;
+ celix_bundle_context_t *ctx,
+ const celix_properties_t *ep,
+ const char *adminType,
+ long *outSerializerSvcId) {
+
+ bool psaMatch = false;
+ const char *configured_admin = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
+ if (configured_admin != NULL) {
+ psaMatch = strncmp(configured_admin, adminType, strlen(adminType)) == 0;
+ }
+
+ bool serMatch = false;
+ long serializerSvcId = -1L;
+ if (psaMatch) {
+ const char *configured_serializer = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, NULL);
+ serializerSvcId = getPSASerializer(ctx, configured_serializer);
+ serMatch = serializerSvcId >= 0;
+ }
+
+ bool match = psaMatch && serMatch;
+// printf("Match for endpoint for psa type %s is %s\n", adminType, match ? "true" : "false");
+
+ if (outSerializerSvcId != NULL) {
+ *outSerializerSvcId = serializerSvcId;
+ }
+
+ return match;
}
diff --git a/bundles/pubsub/pubsub_topology_manager/CMakeLists.txt b/bundles/pubsub/pubsub_topology_manager/CMakeLists.txt
index 4bba4c5..1f2e1b9 100644
--- a/bundles/pubsub/pubsub_topology_manager/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_topology_manager/CMakeLists.txt
@@ -18,26 +18,26 @@
find_package(UUID REQUIRED)
if(NOT UUID_LIBRARY)
- #i.e. not found for OSX
- set(UUID_LIBRARY "")
- set(UUID_INCLUDE_DIRS "")
+ #i.e. not found for OSX
+ set(UUID_LIBRARY "")
+ set(UUID_INCLUDE_DIRS "")
endif()
add_celix_bundle(celix_pubsub_topology_manager
BUNDLE_SYMBOLICNAME "apache_celix_pubsub_topology_manager"
VERSION "1.0.0"
- GROUP "Celix/PubSub"
+ GROUP "Celix/PubSub"
SOURCES
- src/pstm_activator.c
- src/pubsub_topology_manager.c
- src/pubsub_topology_manager.h
+ src/pstm_activator.c
+ src/pubsub_topology_manager.c
+ src/pubsub_topology_manager.h
)
target_link_libraries(celix_pubsub_topology_manager PRIVATE Celix::framework Celix::log_helper Celix::pubsub_spi Celix::shell_api)
target_include_directories(celix_pubsub_topology_manager PRIVATE ${UUID_INCLUDE_DIRS})
get_target_property(DESC Celix::pubsub_spi TOPIC_INFO_DESCRIPTOR)
celix_bundle_files(celix_pubsub_topology_manager
- ${DESC}
+ ${DESC}
DESTINATION "META-INF/descriptors/services"
)
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pstm_activator.c b/bundles/pubsub/pubsub_topology_manager/src/pstm_activator.c
index 93f6836..b4811a2 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pstm_activator.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pstm_activator.c
@@ -33,140 +33,140 @@
#include "pubsub_listeners.h"
typedef struct pstm_activator {
- pubsub_topology_manager_t *manager;
+ pubsub_topology_manager_t *manager;
- long pubsubDiscoveryTrackerId;
- long pubsubAdminTrackerId;
- long pubsubSubscribersTrackerId;
- long pubsubPublishServiceTrackerId;
- long pubsubPSAMetricsTrackerId;
+ long pubsubDiscoveryTrackerId;
+ long pubsubAdminTrackerId;
+ long pubsubSubscribersTrackerId;
+ long pubsubPublishServiceTrackerId;
+ long pubsubPSAMetricsTrackerId;
- pubsub_discovered_endpoint_listener_t discListenerSvc;
- long discListenerSvcId;
+ pubsub_discovered_endpoint_listener_t discListenerSvc;
+ long discListenerSvcId;
- command_service_t shellCmdSvc;
- long shellCmdSvcId;
+ command_service_t shellCmdSvc;
+ long shellCmdSvcId;
- log_helper_pt loghelper;
+ log_helper_pt loghelper;
} pstm_activator_t;
static int pstm_start(pstm_activator_t *act, celix_bundle_context_t *ctx) {
- celix_status_t status;
-
- act->discListenerSvcId = -1L;
- act->pubsubSubscribersTrackerId = -1L;
- act->pubsubAdminTrackerId = -1L;
- act->pubsubDiscoveryTrackerId = -1L;
- act->pubsubPublishServiceTrackerId = -1L;
- act->pubsubPSAMetricsTrackerId = -1L;
- act->shellCmdSvcId = -1L;
-
- logHelper_create(ctx, &act->loghelper);
- logHelper_start(act->loghelper);
-
- status = pubsub_topologyManager_create(ctx, act->loghelper, &act->manager);
-
- //track pubsub announce endpoint listener services (pubsub discovery components)
- if (status == CELIX_SUCCESS) {
- celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
- opts.addWithProperties = pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded;
- opts.removeWithProperties = pubsub_topologyManager_pubsubAnnounceEndpointListenerRemoved;
- opts.callbackHandle = act->manager;
- opts.filter.serviceName = PUBSUB_ANNOUNCE_ENDPOINT_LISTENER_SERVICE;
- act->pubsubDiscoveryTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- }
-
- //track pubsub admin service (psa components)
- if (status == CELIX_SUCCESS) {
- celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
- opts.addWithProperties = pubsub_topologyManager_psaAdded;
- opts.removeWithProperties = pubsub_topologyManager_psaRemoved;
- opts.callbackHandle = act->manager;
- opts.filter.serviceName = PUBSUB_ADMIN_SERVICE_NAME;
+ celix_status_t status;
+
+ act->discListenerSvcId = -1L;
+ act->pubsubSubscribersTrackerId = -1L;
+ act->pubsubAdminTrackerId = -1L;
+ act->pubsubDiscoveryTrackerId = -1L;
+ act->pubsubPublishServiceTrackerId = -1L;
+ act->pubsubPSAMetricsTrackerId = -1L;
+ act->shellCmdSvcId = -1L;
+
+ logHelper_create(ctx, &act->loghelper);
+ logHelper_start(act->loghelper);
+
+ status = pubsub_topologyManager_create(ctx, act->loghelper, &act->manager);
+
+ //track pubsub announce endpoint listener services (pubsub discovery components)
+ if (status == CELIX_SUCCESS) {
+ celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+ opts.addWithProperties = pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded;
+ opts.removeWithProperties = pubsub_topologyManager_pubsubAnnounceEndpointListenerRemoved;
+ opts.callbackHandle = act->manager;
+ opts.filter.serviceName = PUBSUB_ANNOUNCE_ENDPOINT_LISTENER_SERVICE;
+ act->pubsubDiscoveryTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+ }
+
+ //track pubsub admin service (psa components)
+ if (status == CELIX_SUCCESS) {
+ celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+ opts.addWithProperties = pubsub_topologyManager_psaAdded;
+ opts.removeWithProperties = pubsub_topologyManager_psaRemoved;
+ opts.callbackHandle = act->manager;
+ opts.filter.serviceName = PUBSUB_ADMIN_SERVICE_NAME;
opts.filter.ignoreServiceLanguage = true;
- act->pubsubAdminTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- }
-
- //track subscriber service
- if (status == CELIX_SUCCESS) {
- celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
- opts.addWithOwner = pubsub_topologyManager_subscriberAdded;
- opts.removeWithOwner = pubsub_topologyManager_subscriberRemoved;
- opts.callbackHandle = act->manager;
- opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
+ act->pubsubAdminTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+ }
+
+ //track subscriber service
+ if (status == CELIX_SUCCESS) {
+ celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+ opts.addWithOwner = pubsub_topologyManager_subscriberAdded;
+ opts.removeWithOwner = pubsub_topologyManager_subscriberRemoved;
+ opts.callbackHandle = act->manager;
+ opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
opts.filter.ignoreServiceLanguage = true;
- act->pubsubSubscribersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- }
-
-
- //track requests for publisher services
- if (status == CELIX_SUCCESS) {
- act->pubsubPublishServiceTrackerId = celix_bundleContext_trackServiceTrackers(ctx,
- PUBSUB_PUBLISHER_SERVICE_NAME,
- act->manager,
- pubsub_topologyManager_publisherTrackerAdded,
- pubsub_topologyManager_publisherTrackerRemoved);
- }
-
- if (status == CELIX_SUCCESS) {
- celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
- opts.callbackHandle = act->manager;
- opts.addWithProperties = pubsub_topologyManager_addMetricsService;
- opts.removeWithProperties = pubsub_topologyManager_removeMetricsService;
- opts.filter.serviceName = PUBSUB_ADMIN_METRICS_SERVICE_NAME;
- act->pubsubPSAMetricsTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- }
-
- //register discovered endpoints listener service (called by pubsub discovery components)
- if (status == CELIX_SUCCESS) {
- act->discListenerSvc.handle = act->manager;
- act->discListenerSvc.addDiscoveredEndpoint = pubsub_topologyManager_addDiscoveredEndpoint;
- act->discListenerSvc.removeDiscoveredEndpoint = pubsub_topologyManager_removeDiscoveredEndpoint;
- act->discListenerSvcId = celix_bundleContext_registerService(ctx, &act->discListenerSvc, PUBSUB_DISCOVERED_ENDPOINT_LISTENER_SERVICE, NULL);
- }
-
- //register shell command
- if (status == CELIX_SUCCESS) {
- act->shellCmdSvc.handle = act->manager;
- act->shellCmdSvc.executeCommand = pubsub_topologyManager_shellCommand;
- celix_properties_t *props = celix_properties_create();
- celix_properties_set(props, OSGI_SHELL_COMMAND_NAME, "pstm");
- celix_properties_set(props, OSGI_SHELL_COMMAND_USAGE, "pstm [topology|metrics]"); //TODO add search topic/scope option
- celix_properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "pubsub_topology_info: Overview of Topology information for PubSub");
- act->shellCmdSvcId = celix_bundleContext_registerService(ctx, &act->shellCmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props);
- }
-
- //TODO add tracker for pubsub_serializer and
- //1) on remove reset sender/receivers entries
- //2) on add indicate that topic/senders should be reevaluated.
-
- /* NOTE: Enable those line in order to remotely expose the topic_info service
- celix_properties_t *props = celix_properties_create();
- celix_properties_set(props, OSGI_RSA_SERVICE_EXPORTED_INTERFACES, PUBSUB_TOPIC_INFO_SERVICE);
- status += bundleContext_registerService(context, (char *) PUBSUB_TOPIC_INFO_SERVICE, activator->topicInfo, props, &activator->topicInfoService);
- */
-
- return 0;
+ act->pubsubSubscribersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+ }
+
+
+ //track requests for publisher services
+ if (status == CELIX_SUCCESS) {
+ act->pubsubPublishServiceTrackerId = celix_bundleContext_trackServiceTrackers(ctx,
+ PUBSUB_PUBLISHER_SERVICE_NAME,
+ act->manager,
+ pubsub_topologyManager_publisherTrackerAdded,
+ pubsub_topologyManager_publisherTrackerRemoved);
+ }
+
+ if (status == CELIX_SUCCESS) {
+ celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+ opts.callbackHandle = act->manager;
+ opts.addWithProperties = pubsub_topologyManager_addMetricsService;
+ opts.removeWithProperties = pubsub_topologyManager_removeMetricsService;
+ opts.filter.serviceName = PUBSUB_ADMIN_METRICS_SERVICE_NAME;
+ act->pubsubPSAMetricsTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+ }
+
+ //register discovered endpoints listener service (called by pubsub discovery components)
+ if (status == CELIX_SUCCESS) {
+ act->discListenerSvc.handle = act->manager;
+ act->discListenerSvc.addDiscoveredEndpoint = pubsub_topologyManager_addDiscoveredEndpoint;
+ act->discListenerSvc.removeDiscoveredEndpoint = pubsub_topologyManager_removeDiscoveredEndpoint;
+ act->discListenerSvcId = celix_bundleContext_registerService(ctx, &act->discListenerSvc, PUBSUB_DISCOVERED_ENDPOINT_LISTENER_SERVICE, NULL);
+ }
+
+ //register shell command
+ if (status == CELIX_SUCCESS) {
+ act->shellCmdSvc.handle = act->manager;
+ act->shellCmdSvc.executeCommand = pubsub_topologyManager_shellCommand;
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, OSGI_SHELL_COMMAND_NAME, "pstm");
+ celix_properties_set(props, OSGI_SHELL_COMMAND_USAGE, "pstm [topology|metrics]"); //TODO add search topic/scope option
+ celix_properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "pubsub_topology_info: Overview of Topology information for PubSub");
+ act->shellCmdSvcId = celix_bundleContext_registerService(ctx, &act->shellCmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props);
+ }
+
+ //TODO add tracker for pubsub_serializer and
+ //1) on remove reset sender/receivers entries
+ //2) on add indicate that topic/senders should be reevaluated.
+
+ /* NOTE: Enable those line in order to remotely expose the topic_info service
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, OSGI_RSA_SERVICE_EXPORTED_INTERFACES, PUBSUB_TOPIC_INFO_SERVICE);
+ status += bundleContext_registerService(context, (char *) PUBSUB_TOPIC_INFO_SERVICE, activator->topicInfo, props, &activator->topicInfoService);
+ */
+
+ return 0;
}
static int pstm_stop(pstm_activator_t *act, celix_bundle_context_t *ctx) {
- celix_bundleContext_stopTracker(ctx, act->pubsubSubscribersTrackerId);
- celix_bundleContext_stopTracker(ctx, act->pubsubDiscoveryTrackerId);
- celix_bundleContext_stopTracker(ctx, act->pubsubAdminTrackerId);
- celix_bundleContext_stopTracker(ctx, act->pubsubPublishServiceTrackerId);
- celix_bundleContext_stopTracker(ctx, act->pubsubPSAMetricsTrackerId);
- celix_bundleContext_unregisterService(ctx, act->discListenerSvcId);
- celix_bundleContext_unregisterService(ctx, act->shellCmdSvcId);
+ celix_bundleContext_stopTracker(ctx, act->pubsubSubscribersTrackerId);
+ celix_bundleContext_stopTracker(ctx, act->pubsubDiscoveryTrackerId);
+ celix_bundleContext_stopTracker(ctx, act->pubsubAdminTrackerId);
+ celix_bundleContext_stopTracker(ctx, act->pubsubPublishServiceTrackerId);
+ celix_bundleContext_stopTracker(ctx, act->pubsubPSAMetricsTrackerId);
+ celix_bundleContext_unregisterService(ctx, act->discListenerSvcId);
+ celix_bundleContext_unregisterService(ctx, act->shellCmdSvcId);
- pubsub_topologyManager_destroy(act->manager);
+ pubsub_topologyManager_destroy(act->manager);
- logHelper_stop(act->loghelper);
- logHelper_destroy(&act->loghelper);
+ logHelper_stop(act->loghelper);
+ logHelper_destroy(&act->loghelper);
- return 0;
+ return 0;
}
CELIX_GEN_BUNDLE_ACTIVATOR(pstm_activator_t, pstm_start, pstm_stop);
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 1c0748a..294b0de 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -43,7 +43,7 @@
#define PSTM_PSA_HANDLING_SLEEPTIME_IN_SECONDS 30L
#ifndef UUID_STR_LEN
-#define UUID_STR_LEN 37
+#define UUID_STR_LEN 37
#endif
static void *pstm_psaHandlingThread(void *data);
@@ -214,8 +214,8 @@ void pubsub_topologyManager_psaAdded(void *handle, void *svc, const celix_proper
if (needsRematchCount > 0) {
logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO,
"A PSA is added after at least one active publisher/provided. \
- It is preferred that all PSA are started before publiser/subscriber are started!\n\
- Current topic/sender count is %i", needsRematchCount);
+ It is preferred that all PSA are started before publiser/subscriber are started!\n\
+ Current topic/sender count is %i", needsRematchCount);
}
}
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index ed13a47..9dcc13a 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -31,80 +31,80 @@
#include "pubsub/publisher.h"
#include "pubsub/subscriber.h"
-#define PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY "PUBSUB_TOPOLOGY_MANAGER_VERBOSE"
-#define PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE false
+#define PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY "PUBSUB_TOPOLOGY_MANAGER_VERBOSE"
+#define PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE false
typedef struct pubsub_topology_manager {
- bundle_context_pt context;
-
- struct {
- celix_thread_mutex_t mutex;
- hash_map_t *map; //key = svcId, value = pubsub_admin_t*
- } pubsubadmins;
-
- struct {
- celix_thread_mutex_t mutex;
- hash_map_t *map; //key = uuid , value = pstm_discovered_endpoint_entry_t
- } discoveredEndpoints;
-
- struct {
- celix_thread_mutex_t mutex;
- hash_map_t *map; //key = scope/topic key, value = pstm_topic_receiver_or_sender_entry_t*
- } topicReceivers;
-
- struct {
- celix_thread_mutex_t mutex;
- hash_map_t *map; //key = scope/topic key, value = pstm_topic_receiver_or_sender_entry_t*
- } topicSenders;
-
- struct {
- celix_thread_mutex_t mutex;
- celix_array_list_t *list; //<pubsub_announce_endpoint_listener_t*>
- } announceEndpointListeners;
-
- struct {
- celix_thread_mutex_t mutex;
- hash_map_t *map; //key = svcId, value = pubsub_admin_metrics_service_t*
- } psaMetrics;
-
- struct {
- celix_thread_t thread;
- celix_thread_mutex_t mutex; //protect running and condition
- celix_thread_cond_t cond;
- bool running;
- } psaHandling;
-
- log_helper_pt loghelper;
-
- bool verbose;
+ bundle_context_pt context;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = svcId, value = pubsub_admin_t*
+ } pubsubadmins;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = uuid , value = pstm_discovered_endpoint_entry_t
+ } discoveredEndpoints;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = scope/topic key, value = pstm_topic_receiver_or_sender_entry_t*
+ } topicReceivers;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = scope/topic key, value = pstm_topic_receiver_or_sender_entry_t*
+ } topicSenders;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ celix_array_list_t *list; //<pubsub_announce_endpoint_listener_t*>
+ } announceEndpointListeners;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = svcId, value = pubsub_admin_metrics_service_t*
+ } psaMetrics;
+
+ struct {
+ celix_thread_t thread;
+ celix_thread_mutex_t mutex; //protect running and condition
+ celix_thread_cond_t cond;
+ bool running;
+ } psaHandling;
+
+ log_helper_pt loghelper;
+
+ bool verbose;
} pubsub_topology_manager_t;
typedef struct pstm_discovered_endpoint_entry {
- const char *uuid;
- long selectedPsaSvcId; // -1L, indicates no selected psa
- int usageCount; //note that discovered endpoints can be found multiple times by different pubsub discovery components
- celix_properties_t *endpoint;
+ const char *uuid;
+ long selectedPsaSvcId; // -1L, indicates no selected psa
+ int usageCount; //note that discovered endpoints can be found multiple times by different pubsub discovery components
+ celix_properties_t *endpoint;
} pstm_discovered_endpoint_entry_t;
typedef struct pstm_topic_receiver_or_sender_entry {
- bool needsMatch; //true if a psa needs to be selected or if a new psa has to be considered.
-
- char *scopeAndTopicKey; //key of the combined value of the scope and topic
- celix_properties_t *endpoint;
- char *topic;
- char *scope;
- int usageCount; //nr of subscriber service for the topic receiver (matching scope & topic)
- long selectedPsaSvcId;
- long selectedSerializerSvcId;
- long bndId;
- celix_properties_t *topicProperties; //found in META-INF/(pub|sub)/(topic).properties
-
- //for sender entry
- celix_filter_t *publisherFilter;
-
- //for receiver entry
- celix_properties_t *subscriberProperties;
+ bool needsMatch; //true if a psa needs to be selected or if a new psa has to be considered.
+
+ char *scopeAndTopicKey; //key of the combined value of the scope and topic
+ celix_properties_t *endpoint;
+ char *topic;
+ char *scope;
+ int usageCount; //nr of subscriber service for the topic receiver (matching scope & topic)
+ long selectedPsaSvcId;
+ long selectedSerializerSvcId;
+ long bndId;
+ celix_properties_t *topicProperties; //found in META-INF/(pub|sub)/(topic).properties
+
+ //for sender entry
+ celix_filter_t *publisherFilter;
+
+ //for receiver entry
+ celix_properties_t *subscriberProperties;
} pstm_topic_receiver_or_sender_entry_t;
celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_t **manager);
diff --git a/bundles/pubsub/test/test/sut_activator.c b/bundles/pubsub/test/test/sut_activator.c
index 7fed1ff..adb6598 100644
--- a/bundles/pubsub/test/test/sut_activator.c
+++ b/bundles/pubsub/test/test/sut_activator.c
@@ -30,86 +30,86 @@ static void sut_pubSet(void *handle, void *service);
static void* sut_sendThread(void *data);
struct activator {
- long pubTrkId;
+ long pubTrkId;
- pthread_t sendThread;
+ pthread_t sendThread;
- pthread_mutex_t mutex;
- bool running;
- pubsub_publisher_t* pubSvc;
+ pthread_mutex_t mutex;
+ bool running;
+ pubsub_publisher_t* pubSvc;
};
celix_status_t bnd_start(struct activator *act, celix_bundle_context_t *ctx) {
- char filter[512];
- snprintf(filter, 512, "(%s=%s)", PUBSUB_PUBLISHER_TOPIC, "ping");
- celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
- opts.set = sut_pubSet;
- opts.callbackHandle = act;
- opts.filter.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME;
- opts.filter.filter = filter;
- act->pubTrkId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+ char filter[512];
+ snprintf(filter, 512, "(%s=%s)", PUBSUB_PUBLISHER_TOPIC, "ping");
+ celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+ opts.set = sut_pubSet;
+ opts.callbackHandle = act;
+ opts.filter.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME;
+ opts.filter.filter = filter;
+ act->pubTrkId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- act->running = true;
- pthread_create(&act->sendThread, NULL, sut_sendThread, act);
+ act->running = true;
+ pthread_create(&act->sendThread, NULL, sut_sendThread, act);
- return CELIX_SUCCESS;
+ return CELIX_SUCCESS;
}
celix_status_t bnd_stop(struct activator *act, celix_bundle_context_t *ctx) {
- pthread_mutex_lock(&act->mutex);
- act->running = false;
- pthread_mutex_unlock(&act->mutex);
- pthread_join(act->sendThread, NULL);
- pthread_mutex_destroy(&act->mutex);
-
- celix_bundleContext_stopTracker(ctx, act->pubTrkId);
- return CELIX_SUCCESS;
+ pthread_mutex_lock(&act->mutex);
+ act->running = false;
+ pthread_mutex_unlock(&act->mutex);
+ pthread_join(act->sendThread, NULL);
+ pthread_mutex_destroy(&act->mutex);
+
+ celix_bundleContext_stopTracker(ctx, act->pubTrkId);
+ return CELIX_SUCCESS;
}
CELIX_GEN_BUNDLE_ACTIVATOR(struct activator, bnd_start, bnd_stop);
static void sut_pubSet(void *handle, void *service) {
- struct activator* act = handle;
- pthread_mutex_lock(&act->mutex);
- act->pubSvc = service;
- pthread_mutex_unlock(&act->mutex);
+ struct activator* act = handle;
+ pthread_mutex_lock(&act->mutex);
+ act->pubSvc = service;
+ pthread_mutex_unlock(&act->mutex);
}
static void* sut_sendThread(void *data) {
- struct activator *act = data;
+ struct activator *act = data;
- pthread_mutex_lock(&act->mutex);
- bool running = act->running;
- pthread_mutex_unlock(&act->mutex);
+ pthread_mutex_lock(&act->mutex);
+ bool running = act->running;
+ pthread_mutex_unlock(&act->mutex);
- unsigned int msgId = 0;
- msg_t msg;
- msg.seqNr = 1;
+ unsigned int msgId = 0;
+ msg_t msg;
+ msg.seqNr = 1;
- while (running) {
- pthread_mutex_lock(&act->mutex);
- if (act->pubSvc != NULL) {
- if (msgId == 0) {
- act->pubSvc->localMsgTypeIdForMsgType(act->pubSvc->handle, MSG_NAME, &msgId);
- }
+ while (running) {
+ pthread_mutex_lock(&act->mutex);
+ if (act->pubSvc != NULL) {
+ if (msgId == 0) {
+ act->pubSvc->localMsgTypeIdForMsgType(act->pubSvc->handle, MSG_NAME, &msgId);
+ }
- act->pubSvc->send(act->pubSvc->handle, msgId, &msg);
+ act->pubSvc->send(act->pubSvc->handle, msgId, &msg);
if (msg.seqNr % 1000 == 0) {
printf("Send %i messages\n", msg.seqNr);
}
- msg.seqNr += 1;
+ msg.seqNr += 1;
}
pthread_mutex_unlock(&act->mutex);
- usleep(10000);
+ usleep(10000);
- pthread_mutex_lock(&act->mutex);
- running = act->running;
- pthread_mutex_unlock(&act->mutex);
- }
+ pthread_mutex_lock(&act->mutex);
+ running = act->running;
+ pthread_mutex_unlock(&act->mutex);
+ }
printf("Send %i messages\n", msg.seqNr);
return NULL;
diff --git a/bundles/pubsub/test/test/tst_activator.cc b/bundles/pubsub/test/test/tst_activator.cc
index 71dc5fc..4430adb 100644
--- a/bundles/pubsub/test/test/tst_activator.cc
+++ b/bundles/pubsub/test/test/tst_activator.cc
@@ -89,13 +89,13 @@ static int tst_receive(void *handle, const char * /*msgType*/, unsigned int /*ms
TEST_GROUP(PUBSUB_INT_GROUP)
{
- void setup() {
- //nop
- }
+ void setup() {
+ //nop
+ }
- void teardown() {
- //nop
- }
+ void teardown() {
+ //nop
+ }
};
TEST(PUBSUB_INT_GROUP, recvTest) {