You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/10/12 09:03:19 UTC
[02/34] celix git commit: CELIX-454: Refactors pubsub discovery and
topology manager trying to prevent race conditions
http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
index 11e7396..4285271 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
@@ -27,6 +27,7 @@
#include <string.h>
#include <stdlib.h>
#include <uuid/uuid.h>
+#include <celix_api.h>
#include "celix_errno.h"
#include "celix_log.h"
@@ -38,59 +39,56 @@
#include "pubsub_utils.h"
-static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long bundleId, long serviceId,const char* endpoint, const char *pubsubType, properties_pt topic_props);
+static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, const char *pubsubType, const char *admin, const char *ser, const celix_properties_t* topic_props);
static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const char *topic, bool isPublisher);
-static bool pubsubEndpoint_isEndpointValid(pubsub_endpoint_pt psEp);
-static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long bundleId, long serviceId, const char* endpoint, const char *pubsubType, properties_pt topic_props) {
+static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, const char *pubsubType, const char *adminType, const char *serType, const celix_properties_t *topic_props) {
- if (psEp->endpoint_props == NULL) {
- psEp->endpoint_props = properties_create();
+ if (psEp->properties == NULL) {
+ if (topic_props != NULL) {
+ psEp->properties = celix_properties_copy(topic_props);
+ } else {
+ psEp->properties = properties_create();
+ }
}
char endpointUuid[37];
-
uuid_t endpointUid;
uuid_generate(endpointUid);
uuid_unparse(endpointUid, endpointUuid);
-
- properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_UUID, endpointUuid);
+ celix_properties_set(psEp->properties, PUBSUB_ENDPOINT_UUID, endpointUuid);
if (fwUUID != NULL) {
- properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID, fwUUID);
+ celix_properties_set(psEp->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID, fwUUID);
}
if (scope != NULL) {
- properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE, scope);
+ celix_properties_set(psEp->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE, scope);
}
if (topic != NULL) {
- properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME, topic);
- }
-
- char idBuf[32];
-
- if (bundleId >= 0) {
- snprintf(idBuf, sizeof(idBuf), "%li", bundleId);
- properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_BUNDLE_ID, idBuf);
+ celix_properties_set(psEp->properties, PUBSUB_ENDPOINT_TOPIC_NAME, topic);
}
- if (serviceId >= 0) {
- snprintf(idBuf, sizeof(idBuf), "%li", bundleId);
- properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_SERVICE_ID, idBuf);
+ if (pubsubType != NULL) {
+ celix_properties_set(psEp->properties, PUBSUB_ENDPOINT_TYPE, pubsubType);
}
- if(endpoint != NULL) {
- properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_URL, endpoint);
+ if (adminType != NULL) {
+ celix_properties_set(psEp->properties, PUBSUB_ENDPOINT_ADMIN_TYPE, adminType);
}
- if (pubsubType != NULL) {
- properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_TYPE, pubsubType);
+ if (serType != NULL) {
+ celix_properties_set(psEp->properties, PUBSUB_ENDPOINT_SERIALIZER, serType);
}
- if(topic_props != NULL) {
- properties_copy(topic_props, &(psEp->topic_props));
- }
+ psEp->topicName = celix_properties_get(psEp->properties, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+ psEp->topicScope = celix_properties_get(psEp->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+ psEp->uuid = celix_properties_get(psEp->properties, PUBSUB_ENDPOINT_UUID, NULL);
+ psEp->frameworkUUid = celix_properties_get(psEp->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL);
+ psEp->type = celix_properties_get(psEp->properties, PUBSUB_ENDPOINT_TYPE, NULL);
+ psEp->adminType = celix_properties_get(psEp->properties, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
+ psEp->serializerType = celix_properties_get(psEp->properties, PUBSUB_ENDPOINT_SERIALIZER, NULL);
}
static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const char *topic, bool isPublisher){
@@ -125,29 +123,14 @@ static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const c
return topic_props;
}
-celix_status_t pubsubEndpoint_setField(pubsub_endpoint_pt ep, const char* key, const char* value) {
+celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, const char* pubsubType, const char* adminType, const char *serType, celix_properties_t *topic_props, pubsub_endpoint_t **out) {
celix_status_t status = CELIX_SUCCESS;
- if (ep->endpoint_props == NULL) {
- printf("PUBSUB_EP: No endpoint_props for endpoint available!\n");
- return CELIX_ILLEGAL_STATE;
- }
+ pubsub_endpoint_t *psEp = calloc(1, sizeof(*psEp));
- if (key != NULL && value != NULL) {
- properties_set(ep->endpoint_props, key, value);
- }
+ pubsubEndpoint_setFields(psEp, fwUUID, scope, topic, pubsubType, adminType, serType, topic_props);
- return status;
-}
-
-celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long bundleId, long serviceId, const char* endpoint, const char* pubsubType, properties_pt topic_props,pubsub_endpoint_pt* out){
- celix_status_t status = CELIX_SUCCESS;
-
- pubsub_endpoint_pt psEp = calloc(1, sizeof(*psEp));
-
- pubsubEndpoint_setFields(psEp, fwUUID, scope, topic, bundleId, serviceId, endpoint, pubsubType, topic_props);
-
- if (!pubsubEndpoint_isEndpointValid(psEp)) {
+ if (!pubsubEndpoint_isValid(psEp->properties, true, true)) {
status = CELIX_ILLEGAL_STATE;
}
@@ -162,61 +145,29 @@ celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, cons
}
celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt *out){
- celix_status_t status = CELIX_SUCCESS;
-
- pubsub_endpoint_pt ep = calloc(1,sizeof(*ep));
-
- status = properties_copy(in->endpoint_props, &(ep->endpoint_props));
-
- if (in->topic_props != NULL) {
- status += properties_copy(in->topic_props, &(ep->topic_props));
- }
-
- if (status == CELIX_SUCCESS) {
- *out = ep;
- } else {
- pubsubEndpoint_destroy(ep);
- }
-
- return status;
+ return pubsubEndpoint_createFromProperties(in->properties, out);
}
-celix_status_t pubsubEndpoint_createFromServiceReference(bundle_context_t *ctx, service_reference_pt reference, bool isPublisher, pubsub_endpoint_pt* out){
+celix_status_t pubsubEndpoint_createFromSvc(bundle_context_t* ctx, const celix_bundle_t *bnd, const celix_properties_t *svcProps, bool isPublisher, pubsub_endpoint_pt* out) {
celix_status_t status = CELIX_SUCCESS;
pubsub_endpoint_pt ep = calloc(1,sizeof(*ep));
- const char* fwUUID = NULL;
- bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
-
- const char* scope = NULL;
- serviceReference_getPropertyWithDefault(reference, PUBSUB_SUBSCRIBER_SCOPE, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, &scope);
-
- const char* topic = NULL;
- serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_TOPIC,&topic);
-
- const char* serviceId = NULL;
- serviceReference_getProperty(reference,(char*)OSGI_FRAMEWORK_SERVICE_ID,&serviceId);
-
-
- long bundleId = -1;
- bundle_pt bundle = NULL;
- serviceReference_getBundle(reference, &bundle);
- if (bundle != NULL) {
- bundle_getBundleId(bundle, &bundleId);
- }
+ const char* fwUUID = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
+ const char* scope = celix_properties_get(svcProps, PUBSUB_SUBSCRIBER_SCOPE, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT);
+ const char* topic = celix_properties_get(svcProps, PUBSUB_SUBSCRIBER_TOPIC, NULL);
/* TODO: is topic_props==NULL a fatal error such that EP cannot be created? */
- properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, topic, isPublisher);
+ celix_properties_t *topic_props = pubsubEndpoint_getTopicProperties((celix_bundle_t*)bnd, topic, isPublisher);
const char *pubsubType = isPublisher ? PUBSUB_PUBLISHER_ENDPOINT_TYPE : PUBSUB_SUBSCRIBER_ENDPOINT_TYPE;
- pubsubEndpoint_setFields(ep, fwUUID, scope, topic, bundleId, strtol(serviceId,NULL,10), NULL, pubsubType, topic_props);
+ pubsubEndpoint_setFields(ep, fwUUID, scope, topic, pubsubType, NULL, NULL, topic_props);
if(topic_props != NULL){
celix_properties_destroy(topic_props); //Can be deleted since setFields invokes properties_copy
}
- if (!pubsubEndpoint_isEndpointValid(ep)) {
+ if (!pubsubEndpoint_isValid(ep->properties, true, true)) {
status = CELIX_ILLEGAL_STATE;
}
@@ -230,31 +181,18 @@ celix_status_t pubsubEndpoint_createFromServiceReference(bundle_context_t *ctx,
}
-celix_status_t pubsubEndpoint_createFromDiscoveredProperties(properties_t *discoveredProperties, pubsub_endpoint_pt* out) {
- celix_status_t status = CELIX_SUCCESS;
-
- pubsub_endpoint_pt psEp = calloc(1, sizeof(*psEp));
+struct retrieve_topic_properties_data {
+ celix_properties_t *props;
+ const char *topic;
+ bool isPublisher;
+};
- if (psEp == NULL) {
- return CELIX_ENOMEM;
- }
-
- psEp->endpoint_props = discoveredProperties;
-
- if (!pubsubEndpoint_isEndpointValid(psEp)) {
- status = CELIX_ILLEGAL_STATE;
- }
-
- if (status == CELIX_SUCCESS) {
- *out = psEp;
- } else {
- pubsubEndpoint_destroy(psEp);
- }
-
- return status;
+static void retrieveTopicProperties(void *handle, const celix_bundle_t *bnd) {
+ struct retrieve_topic_properties_data *data = handle;
+ data->props = pubsubEndpoint_getTopicProperties((bundle_pt)bnd, data->topic, data->isPublisher);
}
-celix_status_t pubsubEndpoint_createFromListenerHookInfo(bundle_context_t *ctx, listener_hook_info_pt info, bool isPublisher, pubsub_endpoint_pt* out){
+celix_status_t pubsubEndpoint_createFromListenerHookInfo(bundle_context_t *ctx, const celix_service_tracker_info_t *info, bool isPublisher, pubsub_endpoint_pt* out) {
celix_status_t status = CELIX_SUCCESS;
const char* fwUUID=NULL;
@@ -266,7 +204,8 @@ celix_status_t pubsubEndpoint_createFromListenerHookInfo(bundle_context_t *ctx,
char* topic = NULL;
char* scope = NULL;
- pubsub_getPubSubInfoFromFilter(info->filter, &topic, &scope);
+ const char *filterStr = celix_filter_getFilterString(info->filter);
+ pubsub_getPubSubInfoFromFilter(filterStr, &topic, &scope);
if (topic==NULL) {
free(scope);
@@ -278,22 +217,21 @@ celix_status_t pubsubEndpoint_createFromListenerHookInfo(bundle_context_t *ctx,
pubsub_endpoint_pt psEp = calloc(1, sizeof(**out));
- bundle_pt bundle = NULL;
- long bundleId = -1;
- bundleContext_getBundle(info->context,&bundle);
- bundle_getBundleId(bundle,&bundleId);
-
- properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, topic, isPublisher);
+ struct retrieve_topic_properties_data data;
+ data.props = NULL;
+ data.isPublisher = isPublisher;
+ data.topic = topic;
+ celix_bundleContext_useBundle(ctx, info->bundleId, &data, retrieveTopicProperties);
/* TODO: is topic_props==NULL a fatal error such that EP cannot be created? */
- pubsubEndpoint_setFields(psEp, fwUUID, scope, topic, bundleId, -1, NULL, PUBSUB_PUBLISHER_ENDPOINT_TYPE, topic_props);
+ pubsubEndpoint_setFields(psEp, fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, NULL, NULL, data.props);
free(scope);
free(topic);
- if(topic_props != NULL){
- celix_properties_destroy(topic_props); //Can be deleted since setFields invokes properties_copy
+ if (data.props != NULL) {
+ celix_properties_destroy(data.props); //Can be deleted since setFields invokes properties_copy
}
- if (!pubsubEndpoint_isEndpointValid(psEp)) {
+ if (!pubsubEndpoint_isValid(psEp->properties, false, false)) {
status = CELIX_ILLEGAL_STATE;
}
@@ -309,26 +247,27 @@ celix_status_t pubsubEndpoint_createFromListenerHookInfo(bundle_context_t *ctx,
void pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){
if (psEp == NULL) return;
- if(psEp->topic_props != NULL){
- properties_destroy(psEp->topic_props);
- }
-
- if (psEp->endpoint_props != NULL) {
- properties_destroy(psEp->endpoint_props);
+ if(psEp->properties != NULL){
+ celix_properties_destroy(psEp->properties);
}
free(psEp);
-
- return;
-
}
bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){
+ if (psEp1 && psEp2) {
+ return pubsubEndpoint_equalsWithProperties(psEp1, psEp2->properties);
+ } else {
+ return false;
+ }
+}
- if (psEp1->endpoint_props && psEp2->endpoint_props) {
- return !strcmp(properties_get(psEp1->endpoint_props, PUBSUB_ENDPOINT_UUID),
- properties_get(psEp2->endpoint_props, PUBSUB_ENDPOINT_UUID));
- }else {
+bool pubsubEndpoint_equalsWithProperties(pubsub_endpoint_pt psEp1, const celix_properties_t *props) {
+ if (psEp1->properties && props) {
+ int cmp = strcmp(celix_properties_get(psEp1->properties, PUBSUB_ENDPOINT_UUID, "entry1"),
+ celix_properties_get(props, PUBSUB_ENDPOINT_UUID, "entry2"));
+ return cmp == 0;
+ } else {
return false;
}
}
@@ -340,37 +279,57 @@ char * pubsubEndpoint_createScopeTopicKey(const char* scope, const char* topic)
return result;
}
+celix_status_t pubsubEndpoint_createFromProperties(const celix_properties_t *props, pubsub_endpoint_t **out) {
+ pubsub_endpoint_t *ep = calloc(1, sizeof(*ep));
+ pubsubEndpoint_setFields(ep, NULL, NULL, NULL, NULL, NULL, NULL, props);
+ bool valid = pubsubEndpoint_isValid(ep->properties, true, true);
+ if (valid) {
+ *out = ep;
+ } else {
+ *out = NULL;
+ pubsubEndpoint_destroy(ep);
+ return CELIX_BUNDLE_EXCEPTION;
+ }
+ return CELIX_SUCCESS;
+}
-static bool pubsubEndpoint_isEndpointValid(pubsub_endpoint_pt psEp) {
- //required properties
- bool valid = true;
- static const char* keys[] = {
- PUBSUB_ENDPOINT_UUID,
- PUBSUB_ENDPOINT_FRAMEWORK_UUID,
- PUBSUB_ENDPOINT_TYPE,
- PUBSUB_ENDPOINT_TOPIC_NAME,
- PUBSUB_ENDPOINT_TOPIC_SCOPE,
- NULL };
- int i;
- for (i = 0; keys[i] != NULL; ++i) {
- const char *val = properties_get(psEp->endpoint_props, keys[i]);
- if (val == NULL) { //missing required key
- fprintf(stderr, "[ERROR] PubSubEndpoint: Invalid endpoint missing key: '%s'\n", keys[i]);
- valid = false;
- }
+static bool checkProp(const celix_properties_t *props, const char *key) {
+ const char *val = celix_properties_get(props, key, NULL);
+ if (val == NULL) {
+ fprintf(stderr, "[Error] Missing mandatory entry for endpoint. Missing key is '%s'\n", key);
}
- if (!valid) {
- const char *key = NULL;
- fprintf(stderr, "PubSubEndpoint entries:\n");
- PROPERTIES_FOR_EACH(psEp->endpoint_props, key) {
- fprintf(stderr, "\t'%s' : '%s'\n", key, properties_get(psEp->endpoint_props, key));
- }
- if (psEp->topic_props != NULL) {
- fprintf(stderr, "PubSubEndpoint topic properties entries:\n");
- PROPERTIES_FOR_EACH(psEp->topic_props, key) {
- fprintf(stderr, "\t'%s' : '%s'\n", key, properties_get(psEp->topic_props, key));
- }
- }
+ return val != NULL;
+}
+
+
+bool pubsubEndpoint_isValid(const celix_properties_t *props, bool requireAdminType, bool requireSerializerType) {
+ bool p1 = checkProp(props, PUBSUB_ENDPOINT_UUID);
+ bool p2 = checkProp(props, PUBSUB_ENDPOINT_FRAMEWORK_UUID);
+ bool p3 = checkProp(props, PUBSUB_ENDPOINT_TYPE);
+ bool p4 = true;
+ if (requireAdminType) {
+ checkProp(props, PUBSUB_ENDPOINT_ADMIN_TYPE);
+ }
+ bool p5 = true;
+ if (requireSerializerType) {
+ checkProp(props, PUBSUB_ENDPOINT_SERIALIZER);
}
- return valid;
+ bool p6 = checkProp(props, PUBSUB_ENDPOINT_TOPIC_NAME);
+ bool p7 = checkProp(props, PUBSUB_ENDPOINT_TOPIC_SCOPE);
+
+ return p1 && p2 && p3 && p4 && p5 && p6 && p7;
}
+
+void pubsubEndpoint_setField(pubsub_endpoint_t *ep, const char *key, const char *val) {
+ if (ep != NULL) {
+ celix_properties_set(ep->properties, key, val);
+
+ ep->topicName = celix_properties_get(ep->properties, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+ ep->topicScope = celix_properties_get(ep->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+ ep->uuid = celix_properties_get(ep->properties, PUBSUB_ENDPOINT_UUID, NULL);
+ ep->frameworkUUid = celix_properties_get(ep->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL);
+ ep->type = celix_properties_get(ep->properties, PUBSUB_ENDPOINT_TYPE, NULL);
+ ep->adminType = celix_properties_get(ep->properties, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
+ ep->serializerType = celix_properties_get(ep->properties, PUBSUB_ENDPOINT_SERIALIZER, NULL);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_topology_manager/src/pstm_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pstm_activator.c b/bundles/pubsub/pubsub_topology_manager/src/pstm_activator.c
index eb874fe..0395816 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pstm_activator.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pstm_activator.c
@@ -16,229 +16,126 @@
*specific language governing permissions and limitations
*under the License.
*/
-/*
- * pstm_activator.c
- *
- * \date Sep 29, 2011
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
+
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
+#include <celix_bundle_activator.h>
-#include "constants.h"
-#include "bundle_activator.h"
-#include "service_tracker.h"
-#include "service_registration.h"
+#include "celix_api.h"
-#include "listener_hook_service.h"
#include "log_service.h"
#include "log_helper.h"
#include "pubsub_topology_manager.h"
-#include "publisher_endpoint_announce.h"
+#include "pubsub_listeners.h"
-struct activator {
- bundle_context_pt context;
-
- pubsub_topology_manager_pt manager;
-
- service_tracker_pt pubsubDiscoveryTracker;
- service_tracker_pt pubsubAdminTracker;
- service_tracker_pt pubsubSubscribersTracker;
+typedef struct pstm_activator {
+ pubsub_topology_manager_t *manager;
+ long pubsubDiscoveryTrackerId;
+ long pubsubAdminTrackerId;
+ long pubsubSubscribersTrackerId;
+ long pubsubPublishServiceTrackerId;
listener_hook_service_pt hookService;
service_registration_pt hook;
- publisher_endpoint_announce_pt publisherEPDiscover;
- service_registration_pt publisherEPDiscoverService;
+ pubsub_discovered_endpoint_listener_t discListenerSvc;
+ long discListenerSvcId;
log_helper_pt loghelper;
-};
-
+} pstm_activator_t;
-static celix_status_t bundleActivator_createPSDTracker(struct activator *activator, service_tracker_pt *tracker);
-static celix_status_t bundleActivator_createPSATracker(struct activator *activator, service_tracker_pt *tracker);
-static celix_status_t bundleActivator_createPSSubTracker(struct activator *activator, service_tracker_pt *tracker);
+static int pstm_start(pstm_activator_t *act, celix_bundle_context_t *ctx) {
+ celix_status_t status = CELIX_SUCCESS;
-static celix_status_t bundleActivator_createPSDTracker(struct activator *activator, service_tracker_pt *tracker) {
- celix_status_t status;
+ act->discListenerSvcId = -1L;
+ act->pubsubSubscribersTrackerId = -1L;
+ act->pubsubAdminTrackerId = -1L;
+ act->pubsubDiscoveryTrackerId = -1L;
+ act->pubsubPublishServiceTrackerId = -1L;
- service_tracker_customizer_pt customizer = NULL;
+ logHelper_create(ctx, &act->loghelper);
+ logHelper_start(act->loghelper);
- status = serviceTrackerCustomizer_create(activator->manager,
- NULL,
- pubsub_topologyManager_pubsubDiscoveryAdded,
- pubsub_topologyManager_pubsubDiscoveryModified,
- pubsub_topologyManager_pubsubDiscoveryRemoved,
- &customizer);
+ status = pubsub_topologyManager_create(ctx, act->loghelper, &act->manager);
+ //create PSD tracker
if (status == CELIX_SUCCESS) {
- status = serviceTracker_create(activator->context, (char *) PUBSUB_DISCOVERY_SERVICE, customizer, tracker);
+ celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+ opts.addWithProperties = pubsub_topologyManager_pubsubDiscoveryAdded;
+ opts.removeWithProperties = pubsub_topologyManager_pubsubDiscoveryRemoved;
+ opts.callbackHandle = act->manager;
+ opts.filter.serviceName = PUBSUB_ANNOUNCE_ENDPOINT_LISTENER_SERVICE;
+ act->pubsubDiscoveryTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
}
- return status;
-}
-
-static celix_status_t bundleActivator_createPSATracker(struct activator *activator, service_tracker_pt *tracker) {
- celix_status_t status = CELIX_SUCCESS;
-
- service_tracker_customizer_pt customizer = NULL;
-
- status = serviceTrackerCustomizer_create(activator->manager,
- NULL,
- pubsub_topologyManager_psaAdded,
- pubsub_topologyManager_psaModified,
- pubsub_topologyManager_psaRemoved,
- &customizer);
-
+ //create PSA tracker
if (status == CELIX_SUCCESS) {
- status = serviceTracker_create(activator->context, PUBSUB_ADMIN_SERVICE, customizer, tracker);
+ celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+ opts.addWithProperties = pubsub_topologyManager_psaAdded;
+ opts.removeWithProperties = pubsub_topologyManager_psaRemoved;
+ opts.callbackHandle = act->manager;
+ opts.filter.serviceName = PUBSUB_ADMIN_SERVICE;
+ opts.filter.ignoreServiceLanguage = true;
+ act->pubsubAdminTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
}
- return status;
-}
-
-static celix_status_t bundleActivator_createPSSubTracker(struct activator *activator, service_tracker_pt *tracker) {
- celix_status_t status = CELIX_SUCCESS;
-
- service_tracker_customizer_pt customizer = NULL;
-
- status = serviceTrackerCustomizer_create(activator->manager,
- NULL,
- pubsub_topologyManager_subscriberAdded,
- pubsub_topologyManager_subscriberModified,
- pubsub_topologyManager_subscriberRemoved,
- &customizer);
-
+ //create PSSubtracker
if (status == CELIX_SUCCESS) {
- status = serviceTracker_create(activator->context, PUBSUB_SUBSCRIBER_SERVICE_NAME, customizer, tracker);
+ celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+ opts.addWithOwner = pubsub_topologyManager_subscriberAdded;
+ opts.removeWithOwner = pubsub_topologyManager_subscriberRemoved;
+ opts.callbackHandle = act->manager;
+ opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
+ opts.filter.ignoreServiceLanguage = true;
+ act->pubsubSubscribersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
}
- return status;
-}
-
-celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
- celix_status_t status = CELIX_SUCCESS;
- struct activator *activator = NULL;
-
- activator = calloc(1,sizeof(struct activator));
- if (!activator) {
- return CELIX_ENOMEM;
- }
-
- activator->context = context;
-
- logHelper_create(context, &activator->loghelper);
- logHelper_start(activator->loghelper);
-
- status = pubsub_topologyManager_create(context, activator->loghelper, &activator->manager);
+ //track interest for publishers
if (status == CELIX_SUCCESS) {
- status = bundleActivator_createPSDTracker(activator, &activator->pubsubDiscoveryTracker);
- if (status == CELIX_SUCCESS) {
- status = bundleActivator_createPSATracker(activator, &activator->pubsubAdminTracker);
- if (status == CELIX_SUCCESS) {
- status = bundleActivator_createPSSubTracker(activator, &activator->pubsubSubscribersTracker);
- if (status == CELIX_SUCCESS) {
- *userData = activator;
- }
- }
- }
+ act->pubsubPublishServiceTrackerId = celix_bundleContext_trackServiceTrackers(ctx,
+ PUBSUB_PUBLISHER_SERVICE_NAME,
+ act->manager,
+ pubsub_topologyManager_publisherTrackerAdded,
+ pubsub_topologyManager_publisherTrackerRemoved);
}
- if(status != CELIX_SUCCESS){
- bundleActivator_destroy(activator, context);
+ //register listener for discovery event
+ if (status == CELIX_SUCCESS) {
+ act->discListenerSvc.handle = act->manager;
+ act->discListenerSvc.addDiscoveredEndpoint = pubsub_topologyManager_addDiscoveredEndpoint;
+ act->discListenerSvc.removeDiscoveredEndpoint = pubsub_topologyManager_removeDiscoveredEndpoint;
+ act->discListenerSvcId = celix_bundleContext_registerService(ctx, &act->discListenerSvc, PUBSUB_DISCOVERED_ENDPOINT_LISTENER_SERVICE, NULL);
}
- return status;
-}
-
-
-celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
- celix_status_t status = CELIX_SUCCESS;
- struct activator *activator = userData;
-
- publisher_endpoint_announce_pt pubEPDiscover = calloc(1, sizeof(*pubEPDiscover));
- pubEPDiscover->handle = activator->manager;
- pubEPDiscover->announcePublisher = pubsub_topologyManager_announcePublisher;
- pubEPDiscover->removePublisher = pubsub_topologyManager_removePublisher;
- activator->publisherEPDiscover = pubEPDiscover;
-
- status += bundleContext_registerService(context, (char *) PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE, pubEPDiscover, NULL, &activator->publisherEPDiscoverService);
-
-
- listener_hook_service_pt hookService = calloc(1,sizeof(*hookService));
- hookService->handle = activator->manager;
- hookService->added = pubsub_topologyManager_publisherTrackerAdded;
- hookService->removed = pubsub_topologyManager_publisherTrackerRemoved;
- activator->hookService = hookService;
-
- status += bundleContext_registerService(context, (char *) OSGI_FRAMEWORK_LISTENER_HOOK_SERVICE_NAME, hookService, NULL, &activator->hook);
/* NOTE: Enable those line in order to remotely expose the topic_info service
properties_pt props = properties_create();
properties_set(props, (char *) OSGI_RSA_SERVICE_EXPORTED_INTERFACES, (char *) PUBSUB_TOPIC_INFO_SERVICE);
status += bundleContext_registerService(context, (char *) PUBSUB_TOPIC_INFO_SERVICE, activator->topicInfo, props, &activator->topicInfoService);
*/
- status += serviceTracker_open(activator->pubsubAdminTracker);
-
- status += serviceTracker_open(activator->pubsubDiscoveryTracker);
-
- status += serviceTracker_open(activator->pubsubSubscribersTracker);
-
- return status;
+ return 0;
}
-celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
- celix_status_t status = CELIX_SUCCESS;
- struct activator *activator = userData;
-
- serviceTracker_close(activator->pubsubSubscribersTracker);
- serviceTracker_close(activator->pubsubDiscoveryTracker);
- serviceTracker_close(activator->pubsubAdminTracker);
+static int pstm_stop(pstm_activator_t *act, celix_bundle_context_t *ctx) {
+ celix_bundleContext_stopTracker(ctx, act->pubsubSubscribersTrackerId);
+ celix_bundleContext_stopTracker(ctx, act->pubsubDiscoveryTrackerId);
+ celix_bundleContext_stopTracker(ctx, act->pubsubAdminTrackerId);
+ celix_bundleContext_stopTracker(ctx, act->pubsubPublishServiceTrackerId);
+ celix_bundleContext_unregisterService(ctx, act->discListenerSvcId);
- serviceRegistration_unregister(activator->publisherEPDiscoverService);
- free(activator->publisherEPDiscover);
+ pubsub_topologyManager_destroy(act->manager);
- serviceRegistration_unregister(activator->hook);
- free(activator->hookService);
+ logHelper_stop(act->loghelper);
+ logHelper_destroy(&act->loghelper);
- return status;
+ return 0;
}
-celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) {
- celix_status_t status = CELIX_SUCCESS;
-
- struct activator *activator = userData;
- if (activator == NULL) {
- status = CELIX_BUNDLE_EXCEPTION;
- } else {
-
- if(activator->pubsubSubscribersTracker!=NULL){
- serviceTracker_destroy(activator->pubsubSubscribersTracker);
- }
- if(activator->pubsubDiscoveryTracker!=NULL){
- serviceTracker_destroy(activator->pubsubDiscoveryTracker);
- }
- if(activator->pubsubAdminTracker!=NULL){
- serviceTracker_destroy(activator->pubsubAdminTracker);
- }
-
- if(activator->manager!=NULL){
- status = pubsub_topologyManager_destroy(activator->manager);
- }
-
- logHelper_stop(activator->loghelper);
- logHelper_destroy(&activator->loghelper);
-
- free(activator);
- }
-
- return status;
-}
+CELIX_GEN_BUNDLE_ACTIVATOR(pstm_activator_t, pstm_start, pstm_stop);
http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 5b983d4..834e3a8 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -27,6 +27,7 @@
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
+#include <celix_api.h>
#include "hash_map.h"
#include "array_list.h"
@@ -37,7 +38,7 @@
#include "log_service.h"
#include "log_helper.h"
-#include "publisher_endpoint_announce.h"
+#include "pubsub_listeners.h"
#include "pubsub_topology_manager.h"
#include "pubsub_admin.h"
@@ -51,15 +52,9 @@ static void print_endpoint_info(hash_map_pt endpoints, FILE *outStream) {
fprintf(outStream, " Endpoint %d\n", i);
fprintf(outStream, " Endpoint properties\n");
const char *propKey;
- if(ep->endpoint_props) {
- PROPERTIES_FOR_EACH(ep->endpoint_props, propKey) {
- fprintf(outStream, " %s => %s\n", propKey, properties_get(ep->endpoint_props, propKey));
- }
- }
- if(ep->topic_props) {
- fprintf(outStream, " Topic properties\n");
- PROPERTIES_FOR_EACH(ep->topic_props, propKey) {
- fprintf(outStream, " %s => %s\n", propKey, properties_get(ep->topic_props, propKey));
+ if(ep->properties) {
+ PROPERTIES_FOR_EACH(ep->properties, propKey) {
+ fprintf(outStream, " %s => %s\n", propKey, celix_properties_get(ep->properties, propKey, NULL));
}
}
}
@@ -68,7 +63,7 @@ static void print_endpoint_info(hash_map_pt endpoints, FILE *outStream) {
}
static celix_status_t shellCommand(void *handle, char * commandLine, FILE *outStream, FILE *errorStream) {
- pubsub_topology_manager_pt manager = (pubsub_topology_manager_pt) handle;
+ pubsub_topology_manager_t *manager = (pubsub_topology_manager_t*) handle;
if (manager->publications && !hashMap_isEmpty(manager->publications)) {
fprintf(outStream, "Publications:\n");
print_endpoint_info(manager->publications, outStream);
@@ -80,7 +75,7 @@ static celix_status_t shellCommand(void *handle, char * commandLine, FILE *outSt
return CELIX_SUCCESS;
}
-celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_pt *manager) {
+celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_t **manager) {
celix_status_t status = CELIX_SUCCESS;
*manager = calloc(1, sizeof(**manager));
@@ -102,7 +97,7 @@ celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_help
arrayList_create(&(*manager)->psaList);
- (*manager)->discoveryList = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL);
+ (*manager)->discoveryList = hashMap_create(NULL, NULL, NULL, NULL);
(*manager)->publications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
(*manager)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
@@ -125,7 +120,7 @@ celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_help
return status;
}
-celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager) {
+celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager) {
celix_status_t status = CELIX_SUCCESS;
celixThreadMutex_lock(&manager->discoveryListLock);
@@ -173,12 +168,11 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager
return status;
}
-celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_pt reference, void * service) {
- celix_status_t status = CELIX_SUCCESS;
- pubsub_topology_manager_pt manager = handle;
+void pubsub_topologyManager_psaAdded(void * handle, void *svc, const celix_properties_t *props __attribute__((unused))) {
+ pubsub_topology_manager_t *manager = handle;
unsigned int i;
- pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service;
+ pubsub_admin_service_pt psa = (pubsub_admin_service_pt) svc;
logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added PSA");
celixThreadMutex_lock(&manager->psaListLock);
@@ -194,7 +188,7 @@ celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_
while (hashMapIterator_hasNext(subscriptionsIterator)) {
array_list_pt sub_ep_list = hashMapIterator_nextValue(subscriptionsIterator);
for(i=0;i<arrayList_size(sub_ep_list);i++){
- status += psa->addSubscription(psa->admin, (pubsub_endpoint_pt)arrayList_get(sub_ep_list,i));
+ psa->addSubscription(psa->admin, (pubsub_endpoint_pt)arrayList_get(sub_ep_list,i));
}
}
@@ -203,7 +197,7 @@ celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_
celixThreadMutex_unlock(&manager->subscriptionsLock);
// Add already detected publications to new PSA
- status = celixThreadMutex_lock(&manager->publicationsLock);
+ celixThreadMutex_lock(&manager->publicationsLock);
hash_map_iterator_pt publicationsIterator = hashMapIterator_create(manager->publications);
//TODO FIXME no matching used, should only add unmatched publications ?
@@ -211,30 +205,20 @@ celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_
while (hashMapIterator_hasNext(publicationsIterator)) {
array_list_pt pub_ep_list = hashMapIterator_nextValue(publicationsIterator);
for(i=0;i<arrayList_size(pub_ep_list);i++){
- status += psa->addPublication(psa->admin, (pubsub_endpoint_pt)arrayList_get(pub_ep_list,i));
+ psa->addPublication(psa->admin, (pubsub_endpoint_pt)arrayList_get(pub_ep_list,i));
}
}
hashMapIterator_destroy(publicationsIterator);
celixThreadMutex_unlock(&manager->publicationsLock);
-
- return status;
-}
-
-celix_status_t pubsub_topologyManager_psaModified(void * handle, service_reference_pt reference, void * service) {
- celix_status_t status = CELIX_SUCCESS;
-
- // Nop...
-
- return status;
}
-celix_status_t pubsub_topologyManager_psaRemoved(void * handle, service_reference_pt reference, void * service) {
+void pubsub_topologyManager_psaRemoved(void * handle, void *svc, const celix_properties_t *props __attribute__((unused))) {
celix_status_t status = CELIX_SUCCESS;
- pubsub_topology_manager_pt manager = handle;
+ pubsub_topology_manager_t *manager = handle;
- pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service;
+ pubsub_admin_service_pt psa = (pubsub_admin_service_pt) svc;
/* Deactivate all publications */
celixThreadMutex_lock(&manager->publicationsLock);
@@ -256,15 +240,15 @@ celix_status_t pubsub_topologyManager_psaRemoved(void * handle, service_referenc
hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
while(hashMapIterator_hasNext(iter)){
service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
- publisher_endpoint_announce_pt disc = NULL;
+ pubsub_announce_endpoint_listener_t *disc = NULL;
bundleContext_getService(manager->context, disc_sr, (void**) &disc);
const char* fwUUID = NULL;
bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
unsigned int i;
for(i=0;i<arrayList_size(pubEP_list);i++){
pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
- if(strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){
- disc->removePublisher(disc->handle,pubEP);
+ if(strcmp(properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){
+ disc->removeEndpoint(disc->handle,pubEP->properties);
}
}
bundleContext_ungetService(manager->context, disc_sr, NULL);
@@ -298,19 +282,16 @@ celix_status_t pubsub_topologyManager_psaRemoved(void * handle, service_referenc
celixThreadMutex_unlock(&manager->psaListLock);
logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Removed PSA");
-
- return status;
}
-celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_reference_pt reference, void * service) {
- celix_status_t status = CELIX_SUCCESS;
- pubsub_topology_manager_pt manager = handle;
+void pubsub_topologyManager_subscriberAdded(void * handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
+ pubsub_topology_manager_t *manager = handle;
//subscriber_service_pt subscriber = (subscriber_service_pt)service;
pubsub_endpoint_pt sub = NULL;
- if(pubsubEndpoint_createFromServiceReference(manager->context, reference,false, &sub) == CELIX_SUCCESS){
+ if(pubsubEndpoint_createFromSvc(manager->context, bnd, props,false, &sub) == CELIX_SUCCESS) {
celixThreadMutex_lock(&manager->subscriptionsLock);
- char *sub_key = pubsubEndpoint_createScopeTopicKey(properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+ char *sub_key = pubsubEndpoint_createScopeTopicKey(properties_get(sub->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(sub->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key);
if(sub_list_by_topic==NULL){
@@ -345,9 +326,9 @@ celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_ref
hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
while(hashMapIterator_hasNext(iter)){
service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
- publisher_endpoint_announce_pt disc = NULL;
+ pubsub_announce_endpoint_listener_t *disc = NULL;
bundleContext_getService(manager->context, disc_sr, (void**) &disc);
- disc->interestedInTopic(disc->handle, properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+ disc->announceEndpoint(disc->handle, sub->properties);
bundleContext_ungetService(manager->context, disc_sr, NULL);
}
hashMapIterator_destroy(iter);
@@ -355,27 +336,14 @@ celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_ref
celixThreadMutex_unlock(&manager->psaListLock);
}
- else{
- status=CELIX_INVALID_BUNDLE_CONTEXT;
- }
-
- return status;
}
-celix_status_t pubsub_topologyManager_subscriberModified(void * handle, service_reference_pt reference, void * service) {
- celix_status_t status = CELIX_SUCCESS;
- // Nop...
-
- return status;
-}
-
-celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_reference_pt reference, void * service) {
- celix_status_t status = CELIX_SUCCESS;
- pubsub_topology_manager_pt manager = handle;
+void pubsub_topologyManager_subscriberRemoved(void * handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
+ pubsub_topology_manager_t *manager = handle;
pubsub_endpoint_pt subcmp = NULL;
- if(pubsubEndpoint_createFromServiceReference(manager->context, reference, false, &subcmp) == CELIX_SUCCESS){
+ if (pubsubEndpoint_createFromSvc(manager->context, bnd, props, false, &subcmp) == CELIX_SUCCESS){
unsigned int j,k;
@@ -384,9 +352,9 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r
hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
while(hashMapIterator_hasNext(iter)){
service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
- publisher_endpoint_announce_pt disc = NULL;
+ pubsub_announce_endpoint_listener_t *disc = NULL;
bundleContext_getService(manager->context, disc_sr, (void**) &disc);
- disc->uninterestedInTopic(disc->handle, properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+ disc->removeEndpoint(disc->handle, subcmp->properties);
bundleContext_ungetService(manager->context, disc_sr, NULL);
}
hashMapIterator_destroy(iter);
@@ -395,7 +363,7 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r
celixThreadMutex_lock(&manager->subscriptionsLock);
celixThreadMutex_lock(&manager->psaListLock);
- char *sub_key = pubsubEndpoint_createScopeTopicKey(properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+ char *sub_key = pubsubEndpoint_createScopeTopicKey(properties_get(subcmp->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(subcmp->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key);
free(sub_key);
if(sub_list_by_topic!=NULL){
@@ -415,7 +383,7 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r
if(arrayList_size(sub_list_by_topic)==0){
for(k=0;k<arrayList_size(manager->psaList);k++){
pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
- psa->closeAllSubscriptions(psa->admin, (char*) properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+ psa->closeAllSubscriptions(psa->admin, (char*) properties_get(subcmp->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subcmp->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
}
}
@@ -430,31 +398,25 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r
pubsubEndpoint_destroy(subcmp);
}
- else{
- status=CELIX_INVALID_BUNDLE_CONTEXT;
- }
-
- return status;
-
}
-celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service_reference_pt reference, void* service) {
- celix_status_t status = CELIX_SUCCESS;
- pubsub_topology_manager_pt manager = (pubsub_topology_manager_pt)handle;
- publisher_endpoint_announce_pt disc = (publisher_endpoint_announce_pt)service;
+void pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, void *svc, const celix_properties_t *props) {
+ pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *)handle;
+ pubsub_announce_endpoint_listener_t *disc = svc;
const char* fwUUID = NULL;
bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
if(fwUUID==NULL){
printf("PSD: ERRROR: Cannot retrieve fwUUID.\n");
- return CELIX_INVALID_BUNDLE_CONTEXT;
+ return;
}
celixThreadMutex_lock(&manager->publicationsLock);
celixThreadMutex_lock(&manager->discoveryListLock);
- hashMap_put(manager->discoveryList, reference, NULL);
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+ hashMap_put(manager->discoveryList, (void*)svcId, NULL);
celixThreadMutex_unlock(&manager->discoveryListLock);
hash_map_iterator_pt iter = hashMapIterator_create(manager->publications);
@@ -462,8 +424,8 @@ celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service
array_list_pt pubEP_list = (array_list_pt)hashMapIterator_nextValue(iter);
for(unsigned int i = 0; i < arrayList_size(pubEP_list); i++) {
pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
- if( (strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0) && (properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL)){
- status += disc->announcePublisher(disc->handle,pubEP);
+ if( (strcmp(properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0)) {
+ disc->announceEndpoint(disc->handle,pubEP->properties);
}
}
}
@@ -480,202 +442,171 @@ celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service
for(i=0;i<arrayList_size(l);i++){
pubsub_endpoint_pt subEp = (pubsub_endpoint_pt)arrayList_get(l,i);
- disc->interestedInTopic(disc->handle, properties_get(subEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+ disc->announceEndpoint(disc->handle, subEp->properties);
}
}
hashMapIterator_destroy(iter);
celixThreadMutex_unlock(&manager->subscriptionsLock);
-
- return status;
}
-celix_status_t pubsub_topologyManager_pubsubDiscoveryModified(void * handle, service_reference_pt reference, void * service) {
- celix_status_t status = pubsub_topologyManager_pubsubDiscoveryRemoved(handle, reference, service);
- if (status == CELIX_SUCCESS) {
- status = pubsub_topologyManager_pubsubDiscoveryAdded(handle, reference, service);
- }
-
- return status;
-}
-
-celix_status_t pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, service_reference_pt reference, void * service) {
- celix_status_t status = CELIX_SUCCESS;
- pubsub_topology_manager_pt manager = handle;
+void pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, void *svc, const celix_properties_t *props) {
+ pubsub_topology_manager_t *manager = handle;
celixThreadMutex_lock(&manager->discoveryListLock);
- if (hashMap_remove(manager->discoveryList, reference)) {
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+ if (hashMap_remove(manager->discoveryList, (void*)svcId)) {
logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "EndpointListener Removed");
}
celixThreadMutex_unlock(&manager->discoveryListLock);
-
- return status;
}
+static void tm_callAnnounce(void *handle, void *svc) {
+ pubsub_endpoint_t *pub = handle;
+ pubsub_announce_endpoint_listener_t *listener = svc;
+ listener->announceEndpoint(listener->handle, pub->properties);
+}
-celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_list_pt listeners) {
+void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_service_tracker_info_t *info) {
+ pubsub_topology_manager_t *manager = handle;
- celix_status_t status = CELIX_SUCCESS;
- pubsub_topology_manager_pt manager = handle;
-
- unsigned int l_index;
+ pubsub_endpoint_pt pub = NULL;
+ celix_status_t status = pubsubEndpoint_createFromListenerHookInfo(manager->context, info, true, &pub);
+ if (status == CELIX_SUCCESS) {
- for (l_index = 0; l_index < arrayList_size(listeners); l_index++) {
+ celixThreadMutex_lock(&manager->publicationsLock);
+ char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pub->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pub->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+ array_list_pt pub_list_by_topic = hashMap_get(manager->publications, pub_key);
+ if(pub_list_by_topic==NULL){
+ arrayList_create(&pub_list_by_topic);
+ hashMap_put(manager->publications,pub_key,pub_list_by_topic);
+ } else {
+ free(pub_key);
+ }
+ arrayList_add(pub_list_by_topic,pub);
- listener_hook_info_pt info = arrayList_get(listeners, l_index);
+ celixThreadMutex_unlock(&manager->publicationsLock);
- pubsub_endpoint_pt pub = NULL;
- if(pubsubEndpoint_createFromListenerHookInfo(manager->context, info, true, &pub) == CELIX_SUCCESS){
+ unsigned int j;
+ double score = 0;
+ double best_score = 0;
+ pubsub_admin_service_pt best_psa = NULL;
+ celixThreadMutex_lock(&manager->psaListLock);
- celixThreadMutex_lock(&manager->publicationsLock);
- char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
- array_list_pt pub_list_by_topic = hashMap_get(manager->publications, pub_key);
- if(pub_list_by_topic==NULL){
- arrayList_create(&pub_list_by_topic);
- hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic);
- }
- free(pub_key);
- arrayList_add(pub_list_by_topic,pub);
-
- celixThreadMutex_unlock(&manager->publicationsLock);
-
- unsigned int j;
- double score = 0;
- double best_score = 0;
- pubsub_admin_service_pt best_psa = NULL;
- celixThreadMutex_lock(&manager->psaListLock);
-
- for(j=0;j<arrayList_size(manager->psaList);j++){
- pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
- psa->matchEndpoint(psa->admin,pub,&score);
- if(score>best_score){ /* We have a new winner! */
- best_score = score;
- best_psa = psa;
- }
+ int size = celix_arrayList_size(manager->psaList);
+ for (j=0; j<size; j++) {
+ pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
+ psa->matchEndpoint(psa->admin,pub,&score);
+ if(score>best_score){ /* We have a new winner! */
+ best_score = score;
+ best_psa = psa;
}
+ }
- if (best_psa != NULL && best_score > 0) {
- status = best_psa->addPublication(best_psa->admin,pub);
- if(status==CELIX_SUCCESS){
- celixThreadMutex_lock(&manager->discoveryListLock);
- hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
- while(hashMapIterator_hasNext(iter)){
- service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
- publisher_endpoint_announce_pt disc = NULL;
- bundleContext_getService(manager->context, disc_sr, (void**) &disc);
- disc->announcePublisher(disc->handle,pub);
- bundleContext_ungetService(manager->context, disc_sr, NULL);
- }
- hashMapIterator_destroy(iter);
- celixThreadMutex_unlock(&manager->discoveryListLock);
+ if (best_psa != NULL && best_score > 0) {
+ celix_status_t status = best_psa->addPublication(best_psa->admin,pub);
+ if(status==CELIX_SUCCESS){
+ celixThreadMutex_lock(&manager->discoveryListLock);
+ hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveryList);
+ while(hashMapIterator_hasNext(&iter)) {
+ long svcId = (long)hashMapIterator_nextKey(&iter);
+ celix_bundleContext_useServiceWithId(manager->context, svcId, PUBSUB_ANNOUNCE_ENDPOINT_LISTENER_SERVICE, pub, tm_callAnnounce);
}
+ celixThreadMutex_unlock(&manager->discoveryListLock);
}
-
- celixThreadMutex_unlock(&manager->psaListLock);
-
}
- }
-
- return status;
+ celixThreadMutex_unlock(&manager->psaListLock);
+ }
}
-celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, array_list_pt listeners) {
- celix_status_t status = CELIX_SUCCESS;
- pubsub_topology_manager_pt manager = handle;
-
- unsigned int l_index;
+void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_service_tracker_info_t *info) {
+ pubsub_topology_manager_t *manager = handle;
- for (l_index = 0; l_index < arrayList_size(listeners); l_index++) {
-
- listener_hook_info_pt info = arrayList_get(listeners, l_index);
-
- pubsub_endpoint_pt pubcmp = NULL;
- if(pubsubEndpoint_createFromListenerHookInfo(manager->context, info, true, &pubcmp) == CELIX_SUCCESS){
-
-
- unsigned int j,k;
- celixThreadMutex_lock(&manager->psaListLock);
- celixThreadMutex_lock(&manager->publicationsLock);
-
- char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
- array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
- if(pub_list_by_topic!=NULL){
- for(j=0;j<arrayList_size(pub_list_by_topic);j++){
- pubsub_endpoint_pt pub = arrayList_get(pub_list_by_topic,j);
- if(pubsubEndpoint_equals(pub,pubcmp)){
- for(k=0;k<arrayList_size(manager->psaList);k++){
- pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
- status = psa->removePublication(psa->admin,pub);
- if(status==CELIX_SUCCESS){ /* We found the one that manages this endpoint */
- celixThreadMutex_lock(&manager->discoveryListLock);
- hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
- while(hashMapIterator_hasNext(iter)){
- service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
- publisher_endpoint_announce_pt disc = NULL;
- bundleContext_getService(manager->context, disc_sr, (void**) &disc);
- disc->removePublisher(disc->handle,pub);
- bundleContext_ungetService(manager->context, disc_sr, NULL);
- }
- hashMapIterator_destroy(iter);
- celixThreadMutex_unlock(&manager->discoveryListLock);
- }
- else if(status == CELIX_ILLEGAL_ARGUMENT){ /* Not a real error, just saying this psa does not handle this endpoint */
- status = CELIX_SUCCESS;
+ pubsub_endpoint_pt pubcmp = NULL;
+ if(pubsubEndpoint_createFromListenerHookInfo(manager->context, info, true, &pubcmp) == CELIX_SUCCESS){
+ unsigned int j,k;
+ celixThreadMutex_lock(&manager->psaListLock);
+ celixThreadMutex_lock(&manager->publicationsLock);
+
+ char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubcmp->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubcmp->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+ array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
+ if(pub_list_by_topic!=NULL){
+ for(j=0;j<arrayList_size(pub_list_by_topic);j++){
+ pubsub_endpoint_pt pub = arrayList_get(pub_list_by_topic,j);
+ if(pubsubEndpoint_equals(pub,pubcmp)){
+ for(k=0;k<arrayList_size(manager->psaList);k++){
+ pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
+ celix_status_t status = psa->removePublication(psa->admin,pub);
+ if(status==CELIX_SUCCESS){ /* We found the one that manages this endpoint */
+ celixThreadMutex_lock(&manager->discoveryListLock);
+ hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
+ while(hashMapIterator_hasNext(iter)){
+ service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
+ pubsub_announce_endpoint_listener_t *disc = NULL;
+ bundleContext_getService(manager->context, disc_sr, (void**) &disc);
+ disc->removeEndpoint(disc->handle,pub->properties);
+ bundleContext_ungetService(manager->context, disc_sr, NULL);
}
+ hashMapIterator_destroy(iter);
+ celixThreadMutex_unlock(&manager->discoveryListLock);
}
- //}
- arrayList_remove(pub_list_by_topic,j);
-
- /* If it was the last publisher for this topic, tell PSA to close the ZMQ socket and then inform the discovery */
- if(arrayList_size(pub_list_by_topic)==0){
- for(k=0;k<arrayList_size(manager->psaList);k++){
- pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
- psa->closeAllPublications(psa->admin, (char*) properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
- }
+ else if(status == CELIX_ILLEGAL_ARGUMENT){ /* Not a real error, just saying this psa does not handle this endpoint */
+ status = CELIX_SUCCESS;
}
+ }
+ //}
+ arrayList_remove(pub_list_by_topic,j);
- pubsubEndpoint_destroy(pub);
+ /* If it was the last publisher for this topic, tell PSA to close the ZMQ socket and then inform the discovery */
+ if(arrayList_size(pub_list_by_topic)==0){
+ for(k=0;k<arrayList_size(manager->psaList);k++){
+ pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
+ psa->closeAllPublications(psa->admin, (char*) properties_get(pub->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(pub->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+ }
}
+ pubsubEndpoint_destroy(pub);
}
- }
- celixThreadMutex_unlock(&manager->publicationsLock);
- celixThreadMutex_unlock(&manager->psaListLock);
+ }
+ }
- free(pub_key);
+ celixThreadMutex_unlock(&manager->publicationsLock);
+ celixThreadMutex_unlock(&manager->psaListLock);
- pubsubEndpoint_destroy(pubcmp);
+ free(pub_key);
- }
+ pubsubEndpoint_destroy(pubcmp);
}
-
- return status;
}
-celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_endpoint_pt pubEP){
+static celix_status_t pubsub_topologyManager_addDiscoveredPublisher(void *handle, const celix_properties_t *pubProperties){
celix_status_t status = CELIX_SUCCESS;
- pubsub_topology_manager_pt manager = handle;
+ pubsub_topology_manager_t *manager = handle;
+
+ const char *topic = celix_properties_get(pubProperties, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+ const char *scope = celix_properties_get(pubProperties, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+ const char *fwUid = celix_properties_get(pubProperties, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL);
+ const char *uuid = celix_properties_get(pubProperties, PUBSUB_ENDPOINT_UUID, NULL);
+
if (manager->verbose) {
- printf("PSTM: New publisher discovered for topic %s [fwUUID=%s, ep=%s]\n",
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+ printf("PSTM: New publisher discovered for scope/topic %s/%s [fwUUID=%s, epUUID=%s]\n",
+ scope, topic, fwUid, uuid);
}
celixThreadMutex_lock(&manager->psaListLock);
celixThreadMutex_lock(&manager->publicationsLock);
- char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+ char *pub_key = pubsubEndpoint_createScopeTopicKey(scope, topic);
array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
if(pub_list_by_topic==NULL){
@@ -686,7 +617,7 @@ celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_end
/* Shouldn't be any other duplicate, since it's filtered out by the discovery */
pubsub_endpoint_pt p = NULL;
- pubsubEndpoint_clone(pubEP, &p);
+ pubsubEndpoint_createFromProperties(pubProperties, &p);
arrayList_add(pub_list_by_topic , p);
unsigned int j;
@@ -718,27 +649,25 @@ celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_end
return status;
}
-celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpoint_pt pubEP){
- celix_status_t status = CELIX_SUCCESS;
- pubsub_topology_manager_pt manager = handle;
+static celix_status_t pubsub_topologyManager_removeDiscoveredPublisher(void *handle, const celix_properties_t *props) {
+ pubsub_topology_manager_t *manager = handle;
if (manager->verbose) {
printf("PSTM: Publisher removed for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID));
+ celix_properties_get(props, PUBSUB_ENDPOINT_TOPIC_NAME, NULL),
+ celix_properties_get(props, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL),
+ celix_properties_get(props, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),
+ celix_properties_get(props, PUBSUB_ENDPOINT_UUID, NULL));
}
celixThreadMutex_lock(&manager->psaListLock);
celixThreadMutex_lock(&manager->publicationsLock);
unsigned int i;
- char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+ char *pub_key = pubsubEndpoint_createScopeTopicKey(celix_properties_get(props, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL), celix_properties_get(props, PUBSUB_ENDPOINT_TOPIC_NAME, NULL));
array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
if(pub_list_by_topic==NULL){
- printf("PSTM: ERROR: Cannot find topic for known endpoint [%s,%s,%s]. Something is inconsistent.\n",pub_key,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
- status = CELIX_ILLEGAL_STATE;
+ printf("PSTM: ERROR: Cannot find topic for known endpoint [%s,%s,%s]. Something is inconsistent.\n",pub_key,celix_properties_get(props, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),celix_properties_get(props, "pubsub.url", NULL));
}
else{
@@ -747,7 +676,7 @@ celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpo
for(i=0;!found && i<arrayList_size(pub_list_by_topic);i++){
p = (pubsub_endpoint_pt)arrayList_get(pub_list_by_topic,i);
- found = pubsubEndpoint_equals(p,pubEP);
+ found = pubsubEndpoint_equalsWithProperties(p,props);
}
if(found && p !=NULL){
@@ -765,7 +694,7 @@ celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpo
for(i=0;i<arrayList_size(manager->psaList);i++){
pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
- psa->closeAllPublications(psa->admin, (char*) properties_get(p->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(p->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+ psa->closeAllPublications(psa->admin, (char*) celix_properties_get(p->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL), (char*) celix_properties_get(p->properties, PUBSUB_ENDPOINT_TOPIC_NAME, NULL));
}
}
@@ -779,6 +708,31 @@ celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpo
celixThreadMutex_unlock(&manager->psaListLock);
- return status;
+ return CELIX_SUCCESS;
}
+celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const celix_properties_t *properties) {
+ const char *type = celix_properties_get(properties, PUBSUB_ENDPOINT_TYPE, NULL);
+ if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+ return pubsub_topologyManager_addDiscoveredPublisher(handle, properties);
+ } else if (type != NULL && strncmp(PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, type, strlen(PUBSUB_SUBSCRIBER_ENDPOINT_TYPE)) == 0) {
+ //nop //TODO add subscription to pubsub admins
+ } else {
+ fprintf(stderr, "Invalid endpoint. Endpoint has no endpoint type (key: %s)\n", PUBSUB_ENDPOINT_TYPE);
+ }
+ return CELIX_SUCCESS;
+}
+
+celix_status_t pubsub_topologyManager_removeDiscoveredEndpoint(void *handle, const celix_properties_t *properties) {
+ const char *type = celix_properties_get(properties, PUBSUB_ENDPOINT_TYPE, NULL);
+ if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+ pubsub_topologyManager_removeDiscoveredPublisher(handle, properties);
+ } else if (type != NULL && strncmp(PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, type, strlen(PUBSUB_SUBSCRIBER_ENDPOINT_TYPE)) == 0) {
+ //nop //TODO remove subscription from pubsub admins
+ } else {
+ fprintf(stderr, "Invalid endpoint. Endpoint has no endpoint type (key: %s)\n", PUBSUB_ENDPOINT_TYPE);
+ }
+ return CELIX_SUCCESS;
+}
+
+
http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index 769048d..dda84a0 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -16,13 +16,7 @@
*specific language governing permissions and limitations
*under the License.
*/
-/*
- * pubsub_topology_manager.h
- *
- * \date Sep 29, 2011
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
+
#ifndef PUBSUB_TOPOLOGY_MANAGER_H_
#define PUBSUB_TOPOLOGY_MANAGER_H_
@@ -31,13 +25,14 @@
#include "bundle_context.h"
#include "log_helper.h"
#include "command.h"
+#include "celix_bundle_context.h"
#include "pubsub_common.h"
#include "pubsub_endpoint.h"
#include "pubsub/publisher.h"
#include "pubsub/subscriber.h"
- #define PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY "PUBSUB_TOPOLOGY_MANAGER_VERBOSE"
+#define PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY "PUBSUB_TOPOLOGY_MANAGER_VERBOSE"
#define PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE false
@@ -48,7 +43,7 @@ struct pubsub_topology_manager {
array_list_pt psaList;
celix_thread_mutex_t discoveryListLock;
- hash_map_pt discoveryList; //<serviceReference,NULL>
+ hash_map_pt discoveryList; //<svcId,NULL>
celix_thread_mutex_t publicationsLock;
hash_map_pt publications; //<topic(string),list<pubsub_ep>>
@@ -65,28 +60,25 @@ struct pubsub_topology_manager {
bool verbose;
};
-typedef struct pubsub_topology_manager *pubsub_topology_manager_pt;
+typedef struct pubsub_topology_manager pubsub_topology_manager_t;
-celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_pt *manager);
-celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager);
-celix_status_t pubsub_topologyManager_closeImports(pubsub_topology_manager_pt manager);
+celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_t **manager);
+celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager);
+celix_status_t pubsub_topologyManager_closeImports(pubsub_topology_manager_t *manager);
-celix_status_t pubsub_topologyManager_psaAdded(void *handle, service_reference_pt reference, void *service);
-celix_status_t pubsub_topologyManager_psaModified(void *handle, service_reference_pt reference, void *service);
-celix_status_t pubsub_topologyManager_psaRemoved(void *handle, service_reference_pt reference, void *service);
+void pubsub_topologyManager_psaAdded(void *handle, void *svc, const celix_properties_t *props);
+void pubsub_topologyManager_psaRemoved(void *handle, void *svc, const celix_properties_t *props);
-celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service_reference_pt reference, void* service);
-celix_status_t pubsub_topologyManager_pubsubDiscoveryModified(void * handle, service_reference_pt reference, void* service);
-celix_status_t pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, service_reference_pt reference, void* service);
+void pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, void *svc, const celix_properties_t *props);
+void pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, void *svc, const celix_properties_t *props);
-celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_reference_pt reference, void * service);
-celix_status_t pubsub_topologyManager_subscriberModified(void * handle, service_reference_pt reference, void * service);
-celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_reference_pt reference, void * service);
+void pubsub_topologyManager_subscriberAdded(void * handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
+void pubsub_topologyManager_subscriberRemoved(void * handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
-celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_list_pt listeners);
-celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, array_list_pt listeners);
+void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_service_tracker_info_t *info);
+void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_service_tracker_info_t *info);
-celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_endpoint_pt pubEP);
-celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpoint_pt pubEP);
+celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const celix_properties_t *properties);
+celix_status_t pubsub_topologyManager_removeDiscoveredEndpoint(void *handle, const celix_properties_t *properties);
#endif /* PUBSUB_TOPOLOGY_MANAGER_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/bundles/shell/shell/src/lb_command.c
----------------------------------------------------------------------
diff --git a/bundles/shell/shell/src/lb_command.c b/bundles/shell/shell/src/lb_command.c
index e4968f3..972c10a 100644
--- a/bundles/shell/shell/src/lb_command.c
+++ b/bundles/shell/shell/src/lb_command.c
@@ -33,7 +33,7 @@ static const char * const ODD_COLOR = "\033[3m"; //italic
static const char * const END_COLOR = "\033[0m";
-#define NONE_GROUP "-"
+#define NONE_GROUP ""
typedef struct lb_options {
//details
@@ -65,7 +65,7 @@ static void collectGroups(void *handle, const celix_bundle_t *bnd) {
hash_map_t *map = handle;
const char *group = celix_bundle_getGroup(bnd);
if (group == NULL) {
- group = NONE_GROUP;
+ group = "-";
addToGroup(map, group, celix_bundle_getId(bnd));
} else {
char *at = strstr(group, "/");
@@ -74,7 +74,6 @@ static void collectGroups(void *handle, const celix_bundle_t *bnd) {
char buf[size+1];
strncpy(buf, group, size);
buf[size] = '\0';
- group = buf;
addToGroup(map, buf, celix_bundle_getId(bnd));
} else {
addToGroup(map, group, celix_bundle_getId(bnd));
http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/libs/etcdlib/api/etcd.h
----------------------------------------------------------------------
diff --git a/libs/etcdlib/api/etcd.h b/libs/etcdlib/api/etcd.h
index 07fc0af..4f9f8b8 100644
--- a/libs/etcdlib/api/etcd.h
+++ b/libs/etcdlib/api/etcd.h
@@ -82,7 +82,15 @@ int etcd_get_directory(const char* directory, etcd_key_value_callback callback,
int etcd_set(const char* key, const char* value, int ttl, bool prevExist);
/**
- * @desc Setting an Etcd-key/value and checks if there is a different previuos value
+ * @desc Refresh the ttl of an existing key.
+ * @param key the etcd key to refresh.
+ * @param ttl the ttl value to use.
+ * @return 0 on success, non zero otherwise.
+ */
+int etcd_refresh(const char *key, int ttl);
+
+/**
+ * @desc Setting an Etcd-key/value and checks if there is a different previous value
* @param const char* key. The Etcd-key (Note: a leading '/' should be avoided)
* @param const char* value. The Etcd-value
* @param int ttl. If non-zero this is used as the TTL value
http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/libs/etcdlib/src/etcd.c
----------------------------------------------------------------------
diff --git a/libs/etcdlib/src/etcd.c b/libs/etcdlib/src/etcd.c
index 5aa9b02..f6a0c9b 100644
--- a/libs/etcdlib/src/etcd.c
+++ b/libs/etcdlib/src/etcd.c
@@ -267,9 +267,6 @@ int etcd_get_directory(const char* directory, etcd_key_value_callback callback,
return retVal;
}
-/**
- * etcd_set
- */
int etcd_set(const char* key, const char* value, int ttl, bool prevExist) {
json_error_t error;
json_t* js_root = NULL;
@@ -290,14 +287,15 @@ int etcd_set(const char* key, const char* value, int ttl, bool prevExist) {
reply.memory = calloc(1, 1); /* will be grown as needed by the realloc above */
reply.memorySize = 0; /* no data at this point */
- reply.header = NULL; /* will be grown as needed by the realloc above */
- reply.headerSize = 0; /* no data at this point */
+ reply.header = NULL; /* will be grown as needed by the realloc above */
+ reply.headerSize = 0; /* no data at this point */
asprintf(&url, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key);
requestPtr += snprintf(requestPtr, req_len, "value=%s", value);
if (ttl > 0) {
requestPtr += snprintf(requestPtr, req_len-(requestPtr-request), ";ttl=%d", ttl);
+ requestPtr += snprintf(requestPtr, req_len-(requestPtr-request), ";ttl=%d", ttl);
}
if (prevExist) {
@@ -336,6 +334,44 @@ int etcd_set(const char* key, const char* value, int ttl, bool prevExist) {
}
+int etcd_refresh(const char* key, int ttl) {
+ int retVal = ETCDLIB_RC_ERROR;
+ char *url;
+ size_t req_len = MAX_OVERHEAD_LENGTH;
+ char request[req_len];
+
+ int res;
+ struct MemoryStruct reply;
+
+ /* Skip leading '/', etcd cannot handle this. */
+ while(*key == '/') {
+ key++;
+ }
+
+ reply.memory = calloc(1, 1); /* will be grown as needed by the realloc above */
+ reply.memorySize = 0; /* no data at this point */
+ reply.header = NULL; /* will be grown as needed by the realloc above */
+ reply.headerSize = 0; /* no data at this point */
+
+ asprintf(&url, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key);
+ snprintf(request, req_len, "ttl=%d;prevExists=true;refresh=true", ttl);
+
+ res = performRequest(url, PUT, request, (void*) &reply);
+ if(url) {
+ free(url);
+ }
+
+ if (res == CURLE_OK) {
+ retVal = ETCDLIB_RC_OK;
+ }
+
+ if (reply.memory) {
+ free(reply.memory);
+ }
+
+ return retVal;
+}
+
/**
* etcd_set_with_check
*/
http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/libs/framework/include/celix_bundle_context.h
----------------------------------------------------------------------
diff --git a/libs/framework/include/celix_bundle_context.h b/libs/framework/include/celix_bundle_context.h
index c0e72e6..5bb6faa 100644
--- a/libs/framework/include/celix_bundle_context.h
+++ b/libs/framework/include/celix_bundle_context.h
@@ -201,12 +201,19 @@ typedef struct celix_service_filter_options {
* The optional service language to filter for. If this is NULL or "" the C language will be used.
*/
const char* serviceLanguage;
+
+
+ /**
+ * Whether to ignore (not filter for) the service.lang property.
+ * If this is set the serviceLanguage field is ignored and the (service.lang=<>) part is not added tot he filter
+ */
+ bool ignoreServiceLanguage;
} celix_service_filter_options_t;
/**
* Macro to create a empty celix_service_filter_options_t type.
*/
-#define CELIX_EMPTY_SERVICE_FILTER_OPTIONS {.serviceName = NULL, .versionRange = NULL, .filter = NULL, .serviceLanguage = NULL}
+#define CELIX_EMPTY_SERVICE_FILTER_OPTIONS {.serviceName = NULL, .versionRange = NULL, .filter = NULL, .serviceLanguage = NULL, .ignoreServiceLanguage = false}
/**
@@ -773,12 +780,34 @@ celix_bundle_t* celix_bundleContext_getBundle(celix_bundle_context_t *ctx);
/**
* Gets the config property - or environment variable if the config property does not exist - for the provided name.
- * @param key The key of the property to receive
- * @param defaultVal The default value to use if the property is not found (can be NULL)
+ * @param key The key of the property to receive.
+ * @param defaultVal The default value to use if the property is not found (can be NULL).
* @return The property value for the provided key or the provided defaultValue is the key is not found.
*/
const char* celix_bundleContext_getProperty(celix_bundle_context_t *ctx, const char *key, const char *defaultVal);
+/**
+ * Gets the config property as converts it to long. If the property is not a valid long, the defaultValue will be returned.
+ * The rest of the behaviour is the same as celix_bundleContext_getProperty.
+
+ * @param key The key of the property to receive.
+ * @param defaultVal The default value to use if the property is not found.
+ * @return The property value for the provided key or the provided defaultValue is the key is not found.
+ */
+long celix_bundleContext_getPropertyAsLong(celix_bundle_context_t *ctx, const char *key, long defaultValue);
+
+/**
+ * Gets the config property as converts it to bool. If the property is not a valid bool, the defaultValue will be returned.
+ * The rest of the behaviour is the same as celix_bundleContext_getProperty.
+
+ * @param key The key of the property to receive.
+ * @param defaultVal The default value to use if the property is not found.
+ * @return The property value for the provided key or the provided defaultValue is the key is not found.
+ */
+bool celix_bundleContext_getPropertyAsBool(celix_bundle_context_t *ctx, const char *key, bool defaultValue);
+
+//TODO getPropertyAs for int, uint, ulong, bool, etc
+
#ifdef __cplusplus
}
#endif
http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/libs/framework/src/bundle_context.c
----------------------------------------------------------------------
diff --git a/libs/framework/src/bundle_context.c b/libs/framework/src/bundle_context.c
index 85f36db..b8b1552 100644
--- a/libs/framework/src/bundle_context.c
+++ b/libs/framework/src/bundle_context.c
@@ -21,6 +21,7 @@
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
+#include <utils.h>
#include "constants.h"
#include "bundle_context_private.h"
@@ -1022,3 +1023,34 @@ const char* celix_bundleContext_getProperty(celix_bundle_context_t *ctx, const c
}
return val;
}
+
+long celix_bundleContext_getPropertyAsLong(celix_bundle_context_t *ctx, const char *key, long defaultValue) {
+ long result = defaultValue;
+ const char *val = celix_bundleContext_getProperty(ctx, key, NULL);
+ if (val != NULL) {
+ char *enptr = NULL;
+ errno = 0;
+ long r = strtol(val, &enptr, 10);
+ if (enptr != val && errno == 0) {
+ result = r;
+ }
+ }
+ return result;
+}
+
+
+bool celix_bundleContext_getPropertyAsBool(celix_bundle_context_t *ctx, const char *key, bool defaultValue) {
+ bool result = defaultValue;
+ const char *val = celix_bundleContext_getProperty(ctx, key, NULL);
+ if (val != NULL) {
+ char buf[32];
+ snprintf(buf, 32, "%s", val);
+ char *trimmed = utils_stringTrim(buf);
+ if (strncasecmp("true", trimmed, strlen("true")) == 0) {
+ result = true;
+ } else if (strncasecmp("false", trimmed, strlen("false")) == 0) {
+ result = false;
+ }
+ }
+ return result;
+}
http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/libs/framework/src/service_tracker.c
----------------------------------------------------------------------
diff --git a/libs/framework/src/service_tracker.c b/libs/framework/src/service_tracker.c
index 93b3365..1bc26e1 100644
--- a/libs/framework/src/service_tracker.c
+++ b/libs/framework/src/service_tracker.c
@@ -742,18 +742,33 @@ celix_service_tracker_t* celix_serviceTracker_createWithOptions(
}
//setting filter
- if (opts->filter.filter != NULL && opts->filter.versionRange != NULL) {
- //TODO version range
- asprintf(&tracker->filter, "&((%s=%s)(%s=%s)%s)", OSGI_FRAMEWORK_OBJECTCLASS, opts->filter.serviceName, CELIX_FRAMEWORK_SERVICE_LANGUAGE, lang, opts->filter.filter);
- } else if (opts->filter.versionRange != NULL) {
- //TODO version range
- asprintf(&tracker->filter, "&((%s=%s)(%s=%s))", OSGI_FRAMEWORK_OBJECTCLASS, opts->filter.serviceName, CELIX_FRAMEWORK_SERVICE_LANGUAGE, lang);
- } else if (opts->filter.filter != NULL) {
- asprintf(&tracker->filter, "(&(%s=%s)(%s=%s)%s)", OSGI_FRAMEWORK_OBJECTCLASS, opts->filter.serviceName, CELIX_FRAMEWORK_SERVICE_LANGUAGE, lang, opts->filter.filter);
+ if (opts->filter.ignoreServiceLanguage) {
+ if (opts->filter.filter != NULL && opts->filter.versionRange != NULL) {
+ //TODO version range
+ asprintf(&tracker->filter, "&((%s=%s)%s)", OSGI_FRAMEWORK_OBJECTCLASS, opts->filter.serviceName, opts->filter.filter);
+ } else if (opts->filter.versionRange != NULL) {
+ //TODO version range
+ asprintf(&tracker->filter, "&((%s=%s))", OSGI_FRAMEWORK_OBJECTCLASS, opts->filter.serviceName);
+ } else if (opts->filter.filter != NULL) {
+ asprintf(&tracker->filter, "(&(%s=%s)%s)", OSGI_FRAMEWORK_OBJECTCLASS, opts->filter.serviceName, opts->filter.filter);
+ } else {
+ asprintf(&tracker->filter, "(&(%s=%s))", OSGI_FRAMEWORK_OBJECTCLASS, opts->filter.serviceName);
+ }
} else {
- asprintf(&tracker->filter, "(&(%s=%s)(%s=%s))", OSGI_FRAMEWORK_OBJECTCLASS, opts->filter.serviceName, CELIX_FRAMEWORK_SERVICE_LANGUAGE, lang);
+ if (opts->filter.filter != NULL && opts->filter.versionRange != NULL) {
+ //TODO version range
+ asprintf(&tracker->filter, "&((%s=%s)(%s=%s)%s)", OSGI_FRAMEWORK_OBJECTCLASS, opts->filter.serviceName, CELIX_FRAMEWORK_SERVICE_LANGUAGE, lang, opts->filter.filter);
+ } else if (opts->filter.versionRange != NULL) {
+ //TODO version range
+ asprintf(&tracker->filter, "&((%s=%s)(%s=%s))", OSGI_FRAMEWORK_OBJECTCLASS, opts->filter.serviceName, CELIX_FRAMEWORK_SERVICE_LANGUAGE, lang);
+ } else if (opts->filter.filter != NULL) {
+ asprintf(&tracker->filter, "(&(%s=%s)(%s=%s)%s)", OSGI_FRAMEWORK_OBJECTCLASS, opts->filter.serviceName, CELIX_FRAMEWORK_SERVICE_LANGUAGE, lang, opts->filter.filter);
+ } else {
+ asprintf(&tracker->filter, "(&(%s=%s)(%s=%s))", OSGI_FRAMEWORK_OBJECTCLASS, opts->filter.serviceName, CELIX_FRAMEWORK_SERVICE_LANGUAGE, lang);
+ }
}
+
//TODO open on other thread?
serviceTracker_open(tracker);
}
http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/libs/utils/include/celix_properties.h
----------------------------------------------------------------------
diff --git a/libs/utils/include/celix_properties.h b/libs/utils/include/celix_properties.h
index 8a547d0..e325222 100644
--- a/libs/utils/include/celix_properties.h
+++ b/libs/utils/include/celix_properties.h
@@ -57,12 +57,15 @@ void celix_properties_set(celix_properties_t *properties, const char *key, const
void celix_properties_unset(celix_properties_t *properties, const char *key);
-celix_properties_t* celix_properties_copy(celix_properties_t *properties);
+celix_properties_t* celix_properties_copy(const celix_properties_t *properties);
long celix_properties_getAsLong(const celix_properties_t *props, const char *key, long defaultValue);
void celix_properties_setLong(celix_properties_t *props, const char *key, long value);
+bool celix_properties_getAsBool(celix_properties_t *props, const char *key, bool defaultValue);
+void celix_properties_setBool(celix_properties_t *props, const char *key, bool val);
+
#define CELIX_PROPERTIES_FOR_EACH(props, key) \
for(hash_map_iterator_t iter = hashMapIterator_construct(props); \
hashMapIterator_hasNext(&iter), (key) = (const char*)hashMapIterator_nextKey(&iter);)
http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/libs/utils/include/celix_threads.h
----------------------------------------------------------------------
diff --git a/libs/utils/include/celix_threads.h b/libs/utils/include/celix_threads.h
index 6af57bb..a9a3049 100644
--- a/libs/utils/include/celix_threads.h
+++ b/libs/utils/include/celix_threads.h
@@ -123,6 +123,8 @@ celix_status_t celixThreadCondition_destroy(celix_thread_cond_t *condition);
celix_status_t celixThreadCondition_wait(celix_thread_cond_t *cond, celix_thread_mutex_t *mutex);
+celix_status_t celixThreadCondition_timedwait(celix_thread_cond_t *cond, celix_thread_mutex_t *mutex, long seconds, long nanoseconds);
+
celix_status_t celixThreadCondition_broadcast(celix_thread_cond_t *cond);
celix_status_t celixThreadCondition_signal(celix_thread_cond_t *cond);
http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/libs/utils/private/test/properties_test.cpp
----------------------------------------------------------------------
diff --git a/libs/utils/private/test/properties_test.cpp b/libs/utils/private/test/properties_test.cpp
index 565f564..8ea6933 100644
--- a/libs/utils/private/test/properties_test.cpp
+++ b/libs/utils/private/test/properties_test.cpp
@@ -158,3 +158,64 @@ TEST(properties, getSet) {
properties_destroy(properties);
}
+TEST(properties, longTest) {
+ properties = properties_create();
+
+ celix_properties_set(properties, "a", "2");
+ celix_properties_set(properties, "b", "-10032L");
+ celix_properties_set(properties, "c", "");
+ celix_properties_set(properties, "d", "garbage");
+
+ long a = celix_properties_getAsLong(properties, "a", -1L);
+ long b = celix_properties_getAsLong(properties, "b", -1L);
+ long c = celix_properties_getAsLong(properties, "c", -1L);
+ long d = celix_properties_getAsLong(properties, "d", -1L);
+ long e = celix_properties_getAsLong(properties, "e", -1L);
+
+ CHECK_EQUAL(2, a);
+ CHECK_EQUAL(-10032L, b);
+ CHECK_EQUAL(-1L, c);
+ CHECK_EQUAL(-1L, d);
+ CHECK_EQUAL(-1L, e);
+
+ celix_properties_setLong(properties, "a", 3L);
+ celix_properties_setLong(properties, "b", -4L);
+ a = celix_properties_getAsLong(properties, "a", -1L);
+ b = celix_properties_getAsLong(properties, "b", -1L);
+ CHECK_EQUAL(3L, a);
+ CHECK_EQUAL(-4L, b);
+
+ celix_properties_destroy(properties);
+}
+
+TEST(properties, boolTest) {
+ properties = properties_create();
+
+ celix_properties_set(properties, "a", "true");
+ celix_properties_set(properties, "b", "false");
+ celix_properties_set(properties, "c", " true ");
+ celix_properties_set(properties, "d", "garbage");
+
+ bool a = celix_properties_getAsBool(properties, "a", false);
+ bool b = celix_properties_getAsBool(properties, "b", true);
+ bool c = celix_properties_getAsBool(properties, "c", false);
+ bool d = celix_properties_getAsBool(properties, "d", true);
+ bool e = celix_properties_getAsBool(properties, "e", false);
+ bool f = celix_properties_getAsBool(properties, "f", true);
+
+ CHECK_EQUAL(true, a);
+ CHECK_EQUAL(false, b);
+ CHECK_EQUAL(true, c);
+ CHECK_EQUAL(true, d);
+ CHECK_EQUAL(false, e);
+ CHECK_EQUAL(true, f);
+
+ celix_properties_setBool(properties, "a", true);
+ celix_properties_setBool(properties, "b", false);
+ a = celix_properties_getAsBool(properties, "a", false);
+ b = celix_properties_getAsBool(properties, "b", true);
+ CHECK_EQUAL(true, a);
+ CHECK_EQUAL(false, b);
+
+ celix_properties_destroy(properties);
+}
http://git-wip-us.apache.org/repos/asf/celix/blob/e30a70f7/libs/utils/src/celix_threads.c
----------------------------------------------------------------------
diff --git a/libs/utils/src/celix_threads.c b/libs/utils/src/celix_threads.c
index 64bdf5b..d8a8091 100644
--- a/libs/utils/src/celix_threads.c
+++ b/libs/utils/src/celix_threads.c
@@ -143,6 +143,13 @@ celix_status_t celixThreadCondition_wait(celix_thread_cond_t *cond, celix_thread
return pthread_cond_wait(cond, mutex);
}
+celix_status_t celixThreadCondition_timedwait(celix_thread_cond_t *cond, celix_thread_mutex_t *mutex, long seconds, long nanoseconds) {
+ struct timespec time;
+ time.tv_sec = seconds;
+ time.tv_nsec = nanoseconds;
+ return pthread_cond_timedwait(cond, mutex, &time);
+}
+
celix_status_t celixThreadCondition_broadcast(celix_thread_cond_t *cond) {
return pthread_cond_broadcast(cond);
}