You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/02/20 12:39:25 UTC
[1/3] celix git commit: Refactors the pubsub spi and fixes an issue
with pubsub endpoint matching.
Repository: celix
Updated Branches:
refs/heads/develop a55fd1523 -> 109edf4d0
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_api/include/pubsub/publisher.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_api/include/pubsub/publisher.h b/pubsub/pubsub_api/include/pubsub/publisher.h
index 3eec149..9f7f3b6 100644
--- a/pubsub/pubsub_api/include/pubsub/publisher.h
+++ b/pubsub/pubsub_api/include/pubsub/publisher.h
@@ -30,12 +30,11 @@
#include <stdlib.h>
#define PUBSUB_PUBLISHER_SERVICE_NAME "pubsub.publisher"
-#define PUBSUB_PUBLISHER_SERVICE_VERSION "2.0.0"
+#define PUBSUB_PUBLISHER_SERVICE_VERSION "2.0.0"
//properties
-#define PUBSUB_PUBLISHER_TOPIC "pubsub.topic"
-#define PUBSUB_PUBLISHER_SCOPE "pubsub.scope"
-#define PUBSUB_PUBLISHER_STRATEGY "pubsub.strategy"
+#define PUBSUB_PUBLISHER_TOPIC "topic"
+#define PUBSUB_PUBLISHER_SCOPE "scope"
#define PUBSUB_PUBLISHER_CONFIG "pubsub.config"
#define PUBSUB_PUBLISHER_SCOPE_DEFAULT "default"
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_api/include/pubsub/subscriber.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_api/include/pubsub/subscriber.h b/pubsub/pubsub_api/include/pubsub/subscriber.h
index 5d87b8a..ca6d4d1 100644
--- a/pubsub/pubsub_api/include/pubsub/subscriber.h
+++ b/pubsub/pubsub_api/include/pubsub/subscriber.h
@@ -33,9 +33,8 @@
#define PUBSUB_SUBSCRIBER_SERVICE_VERSION "2.0.0"
//properties
-#define PUBSUB_SUBSCRIBER_TOPIC "pubsub.topic"
-#define PUBSUB_SUBSCRIBER_SCOPE "pubsub.scope"
-#define PUBSUB_SUBSCRIBER_STRATEGY "pubsub.strategy"
+#define PUBSUB_SUBSCRIBER_TOPIC "topic"
+#define PUBSUB_SUBSCRIBER_SCOPE "scope"
#define PUBSUB_SUBSCRIBER_CONFIG "pubsub.config"
#define PUBSUB_SUBSCRIBER_SCOPE_DEFAULT "default"
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_discovery/src/etcd_watcher.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/src/etcd_watcher.c b/pubsub/pubsub_discovery/src/etcd_watcher.c
index 726269a..fe80c14 100644
--- a/pubsub/pubsub_discovery/src/etcd_watcher.c
+++ b/pubsub/pubsub_discovery/src/etcd_watcher.c
@@ -127,18 +127,18 @@ celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt pubsu
char scope[MAX_FIELD_LENGTH];
char topic[MAX_FIELD_LENGTH];
char fwUUID[MAX_FIELD_LENGTH];
- char serviceId[MAX_FIELD_LENGTH];
+ char pubsubUUID[MAX_FIELD_LENGTH];
memset(rootPath,0,MAX_ROOTNODE_LENGTH);
memset(topic,0,MAX_FIELD_LENGTH);
memset(fwUUID,0,MAX_FIELD_LENGTH);
- memset(serviceId,0,MAX_FIELD_LENGTH);
+ memset(pubsubUUID,0,MAX_FIELD_LENGTH);
etcdWatcher_getRootPath(pubsub_discovery->context, rootPath);
asprintf(&expr, "/%s/%%[^/]/%%[^/]/%%[^/]/%%[^/].*", rootPath);
if(expr) {
- int foundItems = sscanf(etcdKey, expr, scope, topic, fwUUID, serviceId);
+ int foundItems = sscanf(etcdKey, expr, scope, topic, fwUUID, pubsubUUID);
free(expr);
if (foundItems != 4) { // Could happen when a directory is removed, just don't process this.
status = CELIX_ILLEGAL_STATE;
@@ -149,50 +149,28 @@ celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt pubsu
json_error_t error;
json_t* jsonRoot = json_loads(etcdValue, JSON_DECODE_ANY, &error);
- const char* endpoint_serializer = NULL;
- const char* endpoint_admin_type = NULL;
- const char* endpoint_url = NULL;
- const char* endpoint_type = NULL;
+ properties_t *discovered_props = properties_create();
- if (json_is_object(jsonRoot)){
+ if (json_is_object(jsonRoot)) {
- void *iter = json_object_iter(jsonRoot);
+ void *iter = json_object_iter(jsonRoot);
- const char *key;
- json_t *value;
+ const char *key;
+ json_t *value;
- while (iter) {
- key = json_object_iter_key(iter);
- value = json_object_iter_value(iter);
+ while (iter) {
+ key = json_object_iter_key(iter);
+ value = json_object_iter_value(iter);
- if (strcmp(key, PUBSUB_ENDPOINT_SERIALIZER) == 0) {
- endpoint_serializer = json_string_value(value);
- } else if (strcmp(key, PUBSUB_ENDPOINT_ADMIN_TYPE) == 0) {
- endpoint_admin_type = json_string_value(value);
- } else if (strcmp(key, PUBSUB_ENDPOINT_URL) == 0) {
- endpoint_url = json_string_value(value);
- } else if (strcmp(key, PUBSUB_ENDPOINT_TYPE) == 0) {
- endpoint_type = json_string_value(value);
- }
-
- iter = json_object_iter_next(jsonRoot, iter);
- }
-
- if (endpoint_url == NULL) {
- printf("EW: No endpoint found in json object!\n");
- endpoint_url = etcdValue;
- }
-
- } else {
- endpoint_url = etcdValue;
- }
+ properties_set(discovered_props, key, json_string_value(value));
+ iter = json_object_iter_next(jsonRoot, iter);
+ }
+ }
- status = pubsubEndpoint_create(fwUUID,scope,topic,strtol(serviceId,NULL,10),endpoint_url,NULL,pubEP);
- if (status == CELIX_SUCCESS) {
- status += pubsubEndpoint_setField(*pubEP, PUBSUB_ENDPOINT_SERIALIZER, endpoint_serializer);
- status += pubsubEndpoint_setField(*pubEP, PUBSUB_ENDPOINT_ADMIN_TYPE, endpoint_admin_type);
- status += pubsubEndpoint_setField(*pubEP, PUBSUB_ENDPOINT_TYPE, endpoint_type);
+ status = pubsubEndpoint_createFromDiscoveredProperties(discovered_props, pubEP);
+ if (status != CELIX_SUCCESS) {
+ properties_destroy(discovered_props);
}
if (jsonRoot != NULL) {
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_discovery/src/etcd_writer.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/src/etcd_writer.c b/pubsub/pubsub_discovery/src/etcd_writer.c
index e820e50..37220cc 100644
--- a/pubsub/pubsub_discovery/src/etcd_writer.c
+++ b/pubsub/pubsub_discovery/src/etcd_writer.c
@@ -85,9 +85,9 @@ void etcdWriter_destroy(etcd_writer_pt writer) {
memset(dir,0,MAX_ROOTNODE_LENGTH);
snprintf(dir,MAX_ROOTNODE_LENGTH,"%s/%s/%s/%s",
rootPath,
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
- properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID));
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID));
etcd_del(dir);
pubsubEndpoint_destroy(pubEP);
@@ -106,7 +106,7 @@ celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_end
if(storeEP){
const char *fwUUID = NULL;
bundleContext_getProperty(writer->pubsub_discovery->context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
- if(fwUUID && strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), fwUUID) == 0) {
+ if(fwUUID && strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) {
celixThreadMutex_lock(&writer->localPubsLock);
pubsub_endpoint_pt p = NULL;
pubsubEndpoint_clone(pubEP, &p);
@@ -134,32 +134,28 @@ celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_end
const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context);
- asprintf(&key,"%s/%s/%s/%s/%ld",
+ asprintf(&key,"%s/%s/%s/%s/%s",
rootPath,
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
- properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
- pubEP->serviceID);
-
- char serviceID [sizeof(pubEP->serviceID)];
- snprintf(serviceID, sizeof(pubEP->serviceID), "%ld", pubEP->serviceID);
- json_t* jsonEndpoint = json_pack("{s:s,s:s,s:s,s:s,s:s,s:s,s:s}",
- PUBSUB_ENDPOINT_SERVICE_ID, serviceID,
- PUBSUB_ENDPOINT_SERIALIZER, "serializer.json", //TODO: Serializer not (yet) stored in endpoint
- PUBSUB_ENDPOINT_ADMIN_TYPE, "zmq", //TODO: PSA type not (yet) stored in endpoint
- PUBSUB_ENDPOINT_URL, properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL),
- PUBSUB_ENDPOINT_TYPE, "publisher", //TODO: Check if necessary
- PUBSUB_ENDPOINT_TOPIC, properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
- PUBSUB_ENDPOINT_SCOPE, properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE)
- );
- char* jsonEndpointStr = json_dumps(jsonEndpoint, JSON_COMPACT);
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID));
+
+
+ json_t *jsEndpoint = json_object();
+ const char* propKey = NULL;
+ PROPERTIES_FOR_EACH(pubEP->endpoint_props, propKey) {
+ const char* val = properties_get(pubEP->endpoint_props, propKey);
+ json_t* jsVal = json_string(val);
+ json_object_set(jsEndpoint, propKey, jsVal);
+ }
+ char* jsonEndpointStr = json_dumps(jsEndpoint, JSON_COMPACT);
if (!etcd_set(key,jsonEndpointStr,ttl,false)) {
status = CELIX_ILLEGAL_ARGUMENT;
}
- FREE_MEM(key);
FREE_MEM(jsonEndpointStr);
- json_decref(jsonEndpoint);
+ json_decref(jsEndpoint);
return status;
}
@@ -170,12 +166,12 @@ celix_status_t etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, pubsub_
const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context);
- asprintf(&key, "%s/%s/%s/%s/%ld",
+ asprintf(&key, "%s/%s/%s/%s/%s",
rootPath,
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
- properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
- pubEP->serviceID);
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID));
celixThreadMutex_lock(&writer->localPubsLock);
for (unsigned int i = 0; i < arrayList_size(writer->localPubs); i++) {
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_discovery/src/psd_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/src/psd_activator.c b/pubsub/pubsub_discovery/src/psd_activator.c
index 89a517d..ad1cc4a 100644
--- a/pubsub/pubsub_discovery/src/psd_activator.c
+++ b/pubsub/pubsub_discovery/src/psd_activator.c
@@ -29,7 +29,6 @@
#include "pubsub_common.h"
#include "publisher_endpoint_announce.h"
-#include "pubsub_discovery.h"
#include "pubsub_discovery_impl.h"
struct activator {
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index e3e9704..0c075fc 100644
--- a/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -42,6 +42,8 @@
#include "pubsub_endpoint.h"
#include "pubsub_discovery_impl.h"
+static bool pubsub_discovery_isEndpointValid(pubsub_endpoint_pt psEp);
+
/* Discovery activator functions */
celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discovery_pt *ps_discovery) {
celix_status_t status = CELIX_SUCCESS;
@@ -56,11 +58,18 @@ celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discove
(*ps_discovery)->discoveredPubs = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
(*ps_discovery)->listenerReferences = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL);
(*ps_discovery)->watchers = hashMap_create(utils_stringHash,NULL,utils_stringEquals, NULL);
+ (*ps_discovery)->verbose = PUBSUB_ETCD_DISCOVERY_DEFAULT_VERBOSE;
celixThreadMutex_create(&(*ps_discovery)->listenerReferencesMutex, NULL);
celixThreadMutex_create(&(*ps_discovery)->discoveredPubsMutex, NULL);
celixThreadMutex_create(&(*ps_discovery)->watchersMutex, NULL);
}
+ const char *verboseStr = NULL;
+ bundleContext_getProperty(context, PUBSUB_ETCD_DISCOVERY_VERBOSE_KEY, &verboseStr);
+ if (verboseStr != NULL) {
+ (*ps_discovery)->verbose = strncasecmp("true", verboseStr, strlen("true")) == 0;
+ }
+
return status;
}
@@ -119,7 +128,7 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_pt ps_discovery) {
bundleContext_getProperty(ps_discovery->context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
if (fwUUID == NULL) {
- printf("PSD: Cannot retrieve fwUUID.\n");
+ fprintf(stderr, "ERROR PSD: Cannot retrieve fwUUID.\n");
return CELIX_INVALID_BUNDLE_CONTEXT;
}
@@ -143,7 +152,7 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_pt ps_discovery) {
int i;
for (i = 0; i < arrayList_size(pubEP_list); i++) {
pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt) arrayList_get(pubEP_list, i);
- if (strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), fwUUID) == 0) {
+ if (strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) {
etcdWriter_deletePublisherEndpoint(ps_discovery->writer, pubEP);
} else {
pubsub_discovery_informPublishersListeners(ps_discovery, pubEP, false);
@@ -174,10 +183,17 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_pt ps_discovery) {
celix_status_t pubsub_discovery_addNode(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP) {
celix_status_t status = CELIX_SUCCESS;
- bool inform=false;
+
+ bool valid = pubsub_discovery_isEndpointValid(pubEP);
+ if (!valid) {
+ status = CELIX_ILLEGAL_STATE;
+ return status;
+ }
+
+ bool inform = false;
celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
- char *pubs_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ char *pubs_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pubs_key);
if(pubEP_list==NULL){
arrayList_create(&pubEP_list);
@@ -216,12 +232,12 @@ celix_status_t pubsub_discovery_removeNode(pubsub_discovery_pt pubsub_discovery,
bool found = false;
celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
- char *pubs_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ char *pubs_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
array_list_pt pubEP_list = (array_list_pt) hashMap_get(pubsub_discovery->discoveredPubs, pubs_key);
free(pubs_key);
if (pubEP_list == NULL) {
- printf("PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n",
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ printf("WARNING PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n",
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
status = CELIX_ILLEGAL_STATE;
} else {
int i;
@@ -279,14 +295,25 @@ celix_status_t pubsub_discovery_informPublishersListeners(pubsub_discovery_pt pu
/* Service's functions implementation */
celix_status_t pubsub_discovery_announcePublisher(void *handle, pubsub_endpoint_pt pubEP) {
celix_status_t status = CELIX_SUCCESS;
- printf("pubsub_discovery_announcePublisher : %s / %s\n",
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
+ bool valid = pubsub_discovery_isEndpointValid(pubEP);
+ if (!valid) {
+ status = CELIX_ILLEGAL_ARGUMENT;
+ return status;
+ }
+
+ if (pubsub_discovery->verbose) {
+ printf("pubsub_discovery_announcePublisher : %s / %s\n",
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+ }
+
+
+
celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
- char *pub_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key);
if(pubEP_list==NULL){
@@ -308,16 +335,21 @@ celix_status_t pubsub_discovery_announcePublisher(void *handle, pubsub_endpoint_
celix_status_t pubsub_discovery_removePublisher(void *handle, pubsub_endpoint_pt pubEP) {
celix_status_t status = CELIX_SUCCESS;
-
pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
+ bool valid = pubsub_discovery_isEndpointValid(pubEP);
+ if (!valid) {
+ status = CELIX_ILLEGAL_ARGUMENT;
+ return status;
+ }
+
celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
- char *pub_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key);
free(pub_key);
if(pubEP_list==NULL){
- printf("PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n",properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ printf("WARNING PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n",properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
status = CELIX_ILLEGAL_STATE;
}
else{
@@ -332,7 +364,7 @@ celix_status_t pubsub_discovery_removePublisher(void *handle, pubsub_endpoint_pt
}
if(!found){
- printf("PSD: Trying to remove a not existing endpoint. Something is not consistent.\n");
+ printf("WARNING PSD: Trying to remove a not existing endpoint. Something is not consistent.\n");
status = CELIX_ILLEGAL_STATE;
}
else{
@@ -353,7 +385,7 @@ celix_status_t pubsub_discovery_removePublisher(void *handle, pubsub_endpoint_pt
celix_status_t pubsub_discovery_interestedInTopic(void *handle, const char* scope, const char* topic) {
pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
- char *scope_topic_key = createScopeTopicKey(scope, topic);
+ char *scope_topic_key = pubsubEndpoint_createScopeTopicKey(scope, topic);
celixThreadMutex_lock(&pubsub_discovery->watchersMutex);
struct watcher_info * wi = hashMap_get(pubsub_discovery->watchers, scope_topic_key);
if(wi) {
@@ -374,7 +406,7 @@ celix_status_t pubsub_discovery_interestedInTopic(void *handle, const char* scop
celix_status_t pubsub_discovery_uninterestedInTopic(void *handle, const char* scope, const char* topic) {
pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
- char *scope_topic_key = createScopeTopicKey(scope, topic);
+ char *scope_topic_key = pubsubEndpoint_createScopeTopicKey(scope, topic);
celixThreadMutex_lock(&pubsub_discovery->watchersMutex);
hash_map_entry_pt entry = hashMap_getEntry(pubsub_discovery->watchers, scope_topic_key);
@@ -426,7 +458,9 @@ celix_status_t pubsub_discovery_tmPublisherAnnounceAdded(void * handle, service_
celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex);
celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
- printf("PSD: pubsub_tm_announce_publisher added.\n");
+ if (pubsub_discovery->verbose) {
+ printf("PSD: pubsub_tm_announce_publisher added.\n");
+ }
return status;
}
@@ -450,7 +484,9 @@ celix_status_t pubsub_discovery_tmPublisherAnnounceRemoved(void * handle, servic
if (pubsub_discovery->listenerReferences != NULL) {
if (hashMap_remove(pubsub_discovery->listenerReferences, reference)) {
- printf("PSD: pubsub_tm_announce_publisher removed.\n");
+ if (pubsub_discovery->verbose) {
+ printf("PSD: pubsub_tm_announce_publisher removed.\n");
+ }
}
}
celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex);
@@ -458,3 +494,38 @@ celix_status_t pubsub_discovery_tmPublisherAnnounceRemoved(void * handle, servic
return status;
}
+static bool pubsub_discovery_isEndpointValid(pubsub_endpoint_pt psEp) {
+ //required properties
+ bool valid = true;
+ static const char* keys[] = {
+ PUBSUB_ENDPOINT_UUID,
+ PUBSUB_ENDPOINT_FRAMEWORK_UUID,
+ PUBSUB_ENDPOINT_TYPE,
+ PUBSUB_ENDPOINT_ADMIN_TYPE,
+ PUBSUB_ENDPOINT_SERIALIZER,
+ PUBSUB_ENDPOINT_TOPIC_NAME,
+ PUBSUB_ENDPOINT_TOPIC_SCOPE,
+ NULL };
+ int i;
+ for (i = 0; keys[i] != NULL; ++i) {
+ const char *val = properties_get(psEp->endpoint_props, keys[i]);
+ if (val == NULL) { //missing required key
+ fprintf(stderr, "[ERROR] PSD: Invalid endpoint missing key: '%s'\n", keys[i]);
+ valid = false;
+ }
+ }
+ if (!valid) {
+ const char *key = NULL;
+ fprintf(stderr, "PubSubEndpoint entries:\n");
+ PROPERTIES_FOR_EACH(psEp->endpoint_props, key) {
+ fprintf(stderr, "\t'%s' : '%s'\n", key, properties_get(psEp->endpoint_props, key));
+ }
+ if (psEp->topic_props != NULL) {
+ fprintf(stderr, "PubSubEndpoint topic properties entries:\n");
+ PROPERTIES_FOR_EACH(psEp->topic_props, key) {
+ fprintf(stderr, "\t'%s' : '%s'\n", key, properties_get(psEp->topic_props, key));
+ }
+ }
+ }
+ return valid;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h b/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
index 676a6ab..3d25b1c 100644
--- a/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
+++ b/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
@@ -29,12 +29,15 @@
#define FREE_MEM(ptr) if(ptr) {free(ptr); ptr = NULL;}
+#define PUBSUB_ETCD_DISCOVERY_VERBOSE_KEY "PUBSUB_ETCD_DISCOVERY_VERBOSE"
+#define PUBSUB_ETCD_DISCOVERY_DEFAULT_VERBOSE false
+
struct watcher_info {
etcd_watcher_pt watcher;
int nr_references;
};
-struct pubsub_discovery {
+typedef struct pubsub_discovery {
bundle_context_pt context;
celix_thread_mutex_t discoveredPubsMutex;
@@ -47,7 +50,11 @@ struct pubsub_discovery {
hash_map_pt watchers; //key = topicname, value = struct watcher_info
etcd_writer_pt writer;
-};
+
+ bool verbose;
+} pubsub_discovery_t;
+
+typedef struct pubsub_discovery *pubsub_discovery_pt;
celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discovery_pt* node_discovery);
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_serializer_json/src/ps_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/src/ps_activator.c b/pubsub/pubsub_serializer_json/src/ps_activator.c
index fec5892..32dd1fc 100644
--- a/pubsub/pubsub_serializer_json/src/ps_activator.c
+++ b/pubsub/pubsub_serializer_json/src/ps_activator.c
@@ -28,6 +28,7 @@
#include "bundle_activator.h"
#include "service_registration.h"
+#include "pubsub_constants.h"
#include "pubsub_serializer_impl.h"
@@ -70,7 +71,7 @@ celix_status_t bundleActivator_start(void * userData, bundle_context_pt context)
/* Set serializer type */
properties_pt props = properties_create();
- properties_set(props,PUBSUB_SERIALIZER_TYPE_KEY,PUBSUB_SERIALIZER_TYPE);
+ properties_set(props, PUBSUB_SERIALIZER_TYPE_KEY, PUBSUB_SERIALIZER_TYPE);
status = bundleContext_registerService(context, PUBSUB_SERIALIZER_SERVICE, pubsubSerializerSvc, props, &activator->registration);
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/include/publisher_endpoint_announce.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/publisher_endpoint_announce.h b/pubsub/pubsub_spi/include/publisher_endpoint_announce.h
index bd39fc0..607e83a 100644
--- a/pubsub/pubsub_spi/include/publisher_endpoint_announce.h
+++ b/pubsub/pubsub_spi/include/publisher_endpoint_announce.h
@@ -22,6 +22,10 @@
#include "pubsub_endpoint.h"
+
+//TODO refactor to pubsub_endpoint_announce
+//can be used to announce and remove publisher and subscriber endpoints
+
struct publisher_endpoint_announce {
void *handle;
celix_status_t (*announcePublisher)(void *handle, pubsub_endpoint_pt pubEP);
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/include/pubsub_admin.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_admin.h b/pubsub/pubsub_spi/include/pubsub_admin.h
index f24d825..5379415 100644
--- a/pubsub/pubsub_spi/include/pubsub_admin.h
+++ b/pubsub/pubsub_spi/include/pubsub_admin.h
@@ -32,11 +32,8 @@
#include "pubsub_common.h"
#include "pubsub_endpoint.h"
-#define PSA_IP "PSA_IP"
-#define PSA_ITF "PSA_INTERFACE"
-#define PSA_MULTICAST_IP_PREFIX "PSA_MC_PREFIX"
+#include "pubsub_constants.h"
-#define PUBSUB_ADMIN_TYPE_KEY "pubsub_admin.type"
typedef struct pubsub_admin *pubsub_admin_pt;
@@ -52,19 +49,20 @@ struct pubsub_admin_service {
celix_status_t (*closeAllPublications)(pubsub_admin_pt admin,char* scope, char* topic);
celix_status_t (*closeAllSubscriptions)(pubsub_admin_pt admin,char* scope, char* topic);
+ //TODO add match function for subscription service and publication listeners, e.g.:
+ //matchPublisherListener(admin, bundle, filter, outScore)
+ //matchSubscriberService(admin, svcRef, outScore)
+
/* Match principle:
- * - A full matching pubsub_admin gives 200 points
- * - A full matching serializer gives 100 points
- * - If QoS = sample
- * - fallback pubsub_admin order of selection is: udp_mc, zmq. Points allocation is 100,75.
- * - fallback serializers order of selection is: json, void. Points allocation is 30,20.
- * - If QoS = control
- * - fallback pubsub_admin order of selection is: zmq,udp_mc. Points allocation is 100,75.
- * - fallback serializers order of selection is: json, void. Points allocation is 30,20.
- * - If nothing is specified, QoS = sample is assumed, so the same score applies, just divided by two.
- *
+ * - A full matching pubsub_admin gives 100 points
*/
+ //TODO this should only be called for remote endpoints (e.g. not endpoints from this framework
celix_status_t (*matchEndpoint)(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score);
+
+ //TODO redesign add function for handling endpoint seperate, e.g.:
+ //addEndpoint(admin, endpoint);
+ //note that endpoints can be subscribers and publishers
+ //Also note that we than can have pending subscribers and pending (subscriber/publisher) endpoints.
};
typedef struct pubsub_admin_service *pubsub_admin_service_pt;
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/include/pubsub_admin_match.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_admin_match.h b/pubsub/pubsub_spi/include/pubsub_admin_match.h
index e95ca7d..08d6582 100644
--- a/pubsub/pubsub_spi/include/pubsub_admin_match.h
+++ b/pubsub/pubsub_spi/include/pubsub_admin_match.h
@@ -31,10 +31,17 @@
#define QOS_TYPE_SAMPLE "sample" /* A.k.a. unreliable connection */
#define QOS_TYPE_CONTROL "control" /* A.k.a. reliable connection */
-#define PUBSUB_ADMIN_FULL_MATCH_SCORE 200.0F
-#define SERIALIZER_FULL_MATCH_SCORE 100.0F
-
-celix_status_t pubsub_admin_match(properties_pt endpoint_props, const char *pubsub_admin_type, array_list_pt serializerList, double *score);
-celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, array_list_pt serializerList, pubsub_serializer_service_t **serSvc);
+#define PUBSUB_ADMIN_FULL_MATCH_SCORE 100.0F
+
+celix_status_t pubsub_admin_match(
+ pubsub_endpoint_pt endpoint,
+ const char *pubsub_admin_type,
+ const char *frameworkUuid,
+ double sampleScore,
+ double controlScore,
+ double defaultScore,
+ array_list_pt serializerList,
+ double *score);
+celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, array_list_pt serializerList, service_reference_pt *out);
#endif /* PUBSUB_ADMIN_MATCH_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/include/pubsub_constants.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_constants.h b/pubsub/pubsub_spi/include/pubsub_constants.h
new file mode 100644
index 0000000..47e31d3
--- /dev/null
+++ b/pubsub/pubsub_spi/include/pubsub_constants.h
@@ -0,0 +1,30 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef PUBSUB_CONSTANTS_H_
+#define PUBSUB_CONSTANTS_H_
+
+#define PSA_IP "PSA_IP"
+#define PSA_ITF "PSA_INTERFACE"
+#define PSA_MULTICAST_IP_PREFIX "PSA_MC_PREFIX"
+
+#define PUBSUB_ADMIN_TYPE_KEY "pubsub.config"
+#define PUBSUB_SERIALIZER_TYPE_KEY "pubsub.serializer.type"
+
+#endif /* PUBSUB_CONSTANTS_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/include/pubsub_endpoint.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_endpoint.h b/pubsub/pubsub_spi/include/pubsub_endpoint.h
index 598d673..c0492f5 100644
--- a/pubsub/pubsub_spi/include/pubsub_endpoint.h
+++ b/pubsub/pubsub_spi/include/pubsub_endpoint.h
@@ -34,32 +34,48 @@
#include "pubsub/publisher.h"
#include "pubsub/subscriber.h"
-#define PUBSUB_ENDPOINT_ID "pubsub.endpoint.id"
-#define PUBSUB_ENDPOINT_SERVICE_ID "service.id"
-#define PUBSUB_ENDPOINT_SERIALIZER "serializer"
-#define PUBSUB_ENDPOINT_ADMIN_TYPE "pubsub.admin.type"
-#define PUBSUB_ENDPOINT_URL "pubsub.endpoint"
-#define PUBSUB_ENDPOINT_TOPIC "pubsub.topic"
-#define PUBSUB_ENDPOINT_SCOPE "pubsub.scope"
-#define PUBSUB_ENDPOINT_TYPE "pubsub.type"
+#include "pubsub_constants.h"
+
+//required for valid endpoint
+#define PUBSUB_ENDPOINT_UUID "pubsub.endpoint.uuid" //required
+#define PUBSUB_ENDPOINT_FRAMEWORK_UUID "pubsub.framework.uuid" //required
+#define PUBSUB_ENDPOINT_TYPE "pubsub.endpoint.type" //PUBSUB_PUBLISHER_ENDPOINT_TYPE or PUBSUB_SUBSCRIBER_ENDPOINT_TYPE
+#define PUBSUB_ENDPOINT_ADMIN_TYPE PUBSUB_ADMIN_TYPE_KEY
+#define PUBSUB_ENDPOINT_SERIALIZER PUBSUB_SERIALIZER_TYPE_KEY
+#define PUBSUB_ENDPOINT_TOPIC_NAME "pubsub.topic.name"
+#define PUBSUB_ENDPOINT_TOPIC_SCOPE "pubsub.topic.scope"
+
+//optional
+#define PUBSUB_ENDPOINT_SERVICE_ID "pubsub.service.id"
+#define PUBSUB_ENDPOINT_BUNDLE_ID "pubsub.bundle.id"
+#define PUBSUB_ENDPOINT_URL "pubsub.url"
+
+
+#define PUBSUB_PUBLISHER_ENDPOINT_TYPE "pubsub.publisher"
+#define PUBSUB_SUBSCRIBER_ENDPOINT_TYPE "pubsub.subscriber"
+
struct pubsub_endpoint {
- long serviceID; //optional
- bool is_secure; //optional
properties_pt endpoint_props;
properties_pt topic_props;
};
typedef struct pubsub_endpoint *pubsub_endpoint_pt;
-celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props,pubsub_endpoint_pt* psEp);
-celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference,pubsub_endpoint_pt* psEp, bool isPublisher);
-celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt info,pubsub_endpoint_pt* psEp, bool isPublisher);
+celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long bundleId, long serviceId, const char* endpoint, const char* pubsubType, properties_pt topic_props, pubsub_endpoint_pt* psEp);
+celix_status_t pubsubEndpoint_createFromServiceReference(bundle_context_t* ctx, service_reference_pt reference, bool isPublisher, pubsub_endpoint_pt* out);
+celix_status_t pubsubEndpoint_createFromListenerHookInfo(bundle_context_t* ctx, listener_hook_info_pt info, bool isPublisher, pubsub_endpoint_pt* out);
celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt *out);
-celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp);
+void pubsubEndpoint_destroy(pubsub_endpoint_pt psEp);
bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2);
celix_status_t pubsubEndpoint_setField(pubsub_endpoint_pt ep, const char* key, const char* value);
-char *createScopeTopicKey(const char* scope, const char* topic);
+/**
+ * Creates a pubsub_endpoint based on discovered properties.
+ * Will take ownership over the discovredProperties
+ */
+celix_status_t pubsubEndpoint_createFromDiscoveredProperties(properties_t *discoveredProperties, pubsub_endpoint_pt* out);
+
+char * pubsubEndpoint_createScopeTopicKey(const char* scope, const char* topic);
#endif /* PUBSUB_ENDPOINT_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/include/pubsub_serializer.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_serializer.h b/pubsub/pubsub_spi/include/pubsub_serializer.h
index 4489fa4..a91e820 100644
--- a/pubsub/pubsub_spi/include/pubsub_serializer.h
+++ b/pubsub/pubsub_spi/include/pubsub_serializer.h
@@ -32,8 +32,6 @@
#include "pubsub_common.h"
-#define PUBSUB_SERIALIZER_TYPE_KEY "pubsub_serializer.type"
-
/**
* There should be a pubsub_serializer_t
* per msg type (msg id) per bundle
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/include/pubsub_utils.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_utils.h b/pubsub/pubsub_spi/include/pubsub_utils.h
index eb961c9..e4c08ec 100644
--- a/pubsub/pubsub_spi/include/pubsub_utils.h
+++ b/pubsub/pubsub_spi/include/pubsub_utils.h
@@ -30,10 +30,8 @@
#include "bundle_context.h"
#include "array_list.h"
-char* pubsub_getScopeFromFilter(const char* bundle_filter);
-char* pubsub_getTopicFromFilter(const char* bundle_filter);
+celix_status_t pubsub_getPubSubInfoFromFilter(const char* filterstr, const char **topic, const char **scope);
char* pubsub_getKeysBundleDir(bundle_context_pt ctx);
-array_list_pt pubsub_getTopicsFromString(const char* string);
#endif /* PUBSUB_UTILS_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/src/pubsub_admin_match.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/src/pubsub_admin_match.c b/pubsub/pubsub_spi/src/pubsub_admin_match.c
index cfe1dad..5d0fcc9 100644
--- a/pubsub/pubsub_spi/src/pubsub_admin_match.c
+++ b/pubsub/pubsub_spi/src/pubsub_admin_match.c
@@ -19,312 +19,151 @@
#include <string.h>
+#include <limits.h>
+
#include "service_reference.h"
#include "pubsub_admin.h"
#include "pubsub_admin_match.h"
-
-#define KNOWN_PUBSUB_ADMIN_NUM 2
-#define KNOWN_SERIALIZER_NUM 2
-
-static char* qos_sample_pubsub_admin_prio_list[KNOWN_PUBSUB_ADMIN_NUM] = {"udp_mc","zmq"};
-static char* qos_sample_serializer_prio_list[KNOWN_SERIALIZER_NUM] = {"json","void"};
-
-static char* qos_control_pubsub_admin_prio_list[KNOWN_PUBSUB_ADMIN_NUM] = {"zmq","udp_mc"};
-static char* qos_control_serializer_prio_list[KNOWN_SERIALIZER_NUM] = {"json","void"};
-
-static double qos_pubsub_admin_score[KNOWN_PUBSUB_ADMIN_NUM] = {100.0F,75.0F};
-static double qos_serializer_score[KNOWN_SERIALIZER_NUM] = {30.0F,20.0F};
-
-static void get_serializer_type(service_reference_pt svcRef, char **serializerType);
-static void manage_service_from_reference(service_reference_pt svcRef, void **svc, bool getService);
-
-celix_status_t pubsub_admin_match(properties_pt endpoint_props, const char *pubsub_admin_type, array_list_pt serializerList, double *score){
+#include "constants.h"
+
+/*
+ * Match can be called by
+ * a) a local registered pubsub_subscriber service
+ * b) a local opened service tracker for a pubsub_publisher service
+ * c) a remote found publisher endpoint
+ * Note subscribers are not (yet) dicovered remotely
+ */
+celix_status_t pubsub_admin_match(
+ pubsub_endpoint_pt endpoint,
+ const char *pubsub_admin_type,
+ const char *frameworkUuid,
+ double sampleScore,
+ double controlScore,
+ double defaultScore,
+ array_list_pt serializerList,
+ double *out) {
celix_status_t status = CELIX_SUCCESS;
- double final_score = 0;
- int i = 0, j = 0;
+ double score = 0;
+
+ const char *endpointFrameworkUuid = NULL;
+ const char *endpointAdminType = NULL;
const char *requested_admin_type = NULL;
- const char *requested_serializer_type = NULL;
const char *requested_qos_type = NULL;
- if(endpoint_props!=NULL){
- requested_admin_type = properties_get(endpoint_props,PUBSUB_ADMIN_TYPE_KEY);
- requested_serializer_type = properties_get(endpoint_props,PUBSUB_SERIALIZER_TYPE_KEY);
- requested_qos_type = properties_get(endpoint_props,QOS_ATTRIBUTE_KEY);
+ if (endpoint->endpoint_props != NULL) {
+ endpointFrameworkUuid = properties_get(endpoint->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID);
+ endpointAdminType = properties_get(endpoint->endpoint_props, PUBSUB_ENDPOINT_ADMIN_TYPE);
}
-
- /* Analyze the pubsub_admin */
- if(requested_admin_type != NULL){ /* We got precise specification on the pubsub_admin we want */
- if(strncmp(requested_admin_type,pubsub_admin_type,strlen(pubsub_admin_type))==0){ //Full match
- final_score += PUBSUB_ADMIN_FULL_MATCH_SCORE;
- }
- }
- else if(requested_qos_type != NULL){ /* We got QoS specification that will determine the selected PSA */
- if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){
- for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){
- if(strncmp(qos_sample_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){
- final_score += qos_pubsub_admin_score[i];
- break;
- }
- }
- }
- else if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){
- for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){
- if(strncmp(qos_control_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){
- final_score += qos_pubsub_admin_score[i];
- break;
- }
- }
- }
- else{
- printf("Unknown QoS type '%s'\n",requested_qos_type);
- status = CELIX_ILLEGAL_ARGUMENT;
- }
- }
- else{ /* We got no specification: fallback to Qos=Sample, but count half the score */
- for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){
- if(strncmp(qos_sample_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){
- final_score += (qos_pubsub_admin_score[i]/2);
- break;
- }
- }
+ if (endpoint->topic_props != NULL) {
+ requested_admin_type = properties_get(endpoint->topic_props, PUBSUB_ADMIN_TYPE_KEY);
+ requested_qos_type = properties_get(endpoint->topic_props, QOS_ATTRIBUTE_KEY);
}
- char *serializer_type = NULL;
- /* Analyze the serializers */
- if(requested_serializer_type != NULL){ /* We got precise specification on the serializer we want */
- for(i=0;i<arrayList_size(serializerList);i++){
- service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,i);
- get_serializer_type(svcRef, &serializer_type);
- if(serializer_type != NULL){
- if(strncmp(requested_serializer_type,serializer_type,strlen(serializer_type))==0){
- final_score += SERIALIZER_FULL_MATCH_SCORE;
- break;
- }
- }
- }
- }
- else if(requested_qos_type != NULL){ /* We got QoS specification that will determine the selected serializer */
- if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){
- bool ser_found = false;
- for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
- for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){
- service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,j);
- get_serializer_type(svcRef, &serializer_type);
- if(serializer_type != NULL){
- if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
- ser_found = true;
- }
- }
- }
- if(ser_found){
- final_score += qos_serializer_score[i];
- }
+ if (endpointFrameworkUuid != NULL && frameworkUuid != NULL && strncmp(frameworkUuid, endpointFrameworkUuid, 128) == 0) {
+ //match for local subscriber or publisher
+
+ /* Analyze the pubsub_admin */
+ if (requested_admin_type != NULL) { /* We got precise specification on the pubsub_admin we want */
+ if (strncmp(requested_admin_type, pubsub_admin_type, strlen(pubsub_admin_type)) == 0) { //Full match
+ score = PUBSUB_ADMIN_FULL_MATCH_SCORE;
}
- }
- else if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){
- bool ser_found = false;
- for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
- for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){
- service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,j);
- get_serializer_type(svcRef, &serializer_type);
- if(serializer_type != NULL){
- if(strncmp(qos_control_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
- ser_found = true;
- }
- }
- }
- if(ser_found){
- final_score += qos_serializer_score[i];
- }
+ } else if (requested_qos_type != NULL) { /* We got QoS specification that will determine the selected PSA */
+ if (strncmp(requested_qos_type, QOS_TYPE_SAMPLE, strlen(QOS_TYPE_SAMPLE)) == 0) {
+ score = sampleScore;
+ } else if (strncmp(requested_qos_type, QOS_TYPE_CONTROL, strlen(QOS_TYPE_CONTROL)) == 0) {
+ score += controlScore;
+ } else {
+ printf("Unknown QoS type '%s'\n", requested_qos_type);
+ status = CELIX_ILLEGAL_ARGUMENT;
}
+ } else { /* We got no specification: fallback to default score */
+ score = defaultScore;
}
- else{
- printf("Unknown QoS type '%s'\n",requested_qos_type);
- status = CELIX_ILLEGAL_ARGUMENT;
+
+ //NOTE serializer influence the score if a specific serializer is configured and not available.
+ //get best serializer. This is based on service raking or requested serializer. In the case of a request NULL is return if not request match is found.
+ service_reference_pt serSvcRef = NULL;
+ pubsub_admin_get_best_serializer(endpoint->topic_props, serializerList, &serSvcRef);
+ const char *serType = NULL; //for printing info
+ if (serSvcRef == NULL) {
+ score = 0;
+ } else {
+ serviceReference_getProperty(serSvcRef, PUBSUB_SERIALIZER_TYPE_KEY, &serType);
}
- }
- else{ /* We got no specification: fallback to Qos=Sample, but count half the score */
- bool ser_found = false;
- for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
- for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){
- service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,j);
- get_serializer_type(svcRef, &serializer_type);
- if(serializer_type != NULL){
- if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
- ser_found = true;
- }
- }
- }
- if(ser_found){
- final_score += (qos_serializer_score[i]/2);
- }
+
+ printf("Score for psa type %s is %f. Serializer used is '%s'\n", pubsub_admin_type, score, serType);
+ } else {
+ //remote publisher. score will be 0 or 100. nothing else.
+ //TODO FIXME remote publisher should go through a different process. Currently it is confusing what to match
+ if (endpointAdminType == NULL) {
+ score = 0;
+
+// const char *key = NULL;
+// printf("Endpoint properties:\n");
+// PROPERTIES_FOR_EACH(endpoint->endpoint_props, key) {
+// printf("\t%s=%s\n", key, properties_get(endpoint->endpoint_props, key));
+// }
+
+ fprintf(stderr, "WARNING PSA MATCH: remote publisher has no type. The key '%s' must be specified\n", PUBSUB_ENDPOINT_ADMIN_TYPE);
+ } else {
+ score = strncmp(endpointAdminType, pubsub_admin_type, 1024) == 0 ? 100 : 0;
}
+ printf("Score for psa type %s is %f. Publisher is remote\n", pubsub_admin_type, score);
}
- *score = final_score;
- printf("Score for pair <%s,%s> = %f\n",pubsub_admin_type,serializer_type,final_score);
+ *out = score;
return status;
}
-celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, array_list_pt serializerList, pubsub_serializer_service_t **serSvc){
+celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, array_list_pt serializerList, service_reference_pt *out){
celix_status_t status = CELIX_SUCCESS;
-
- int i = 0, j = 0;
-
+ int i;
const char *requested_serializer_type = NULL;
- const char *requested_qos_type = NULL;
if (endpoint_props != NULL){
requested_serializer_type = properties_get(endpoint_props,PUBSUB_SERIALIZER_TYPE_KEY);
- requested_qos_type = properties_get(endpoint_props,QOS_ATTRIBUTE_KEY);
}
service_reference_pt svcRef = NULL;
- void *svc = NULL;
-
- /* Analyze the serializers */
- if (arrayList_size(serializerList) == 1) {
- // Only one serializer, use this one
- svcRef = (service_reference_pt)arrayList_get(serializerList,0);
- manage_service_from_reference(svcRef, &svc, true);
- *serSvc = svc;
- char *serializer_type = NULL;
- get_serializer_type(svcRef, &serializer_type);
- printf("Selected the only serializer available. Type = %s\n", serializer_type);
-
- }
- else if(requested_serializer_type != NULL){ /* We got precise specification on the serializer we want */
- for(i=0;i<arrayList_size(serializerList);i++){
- svcRef = (service_reference_pt)arrayList_get(serializerList,i);
- char *serializer_type = NULL;
- get_serializer_type(svcRef, &serializer_type);
- if(serializer_type != NULL){
- if(strncmp(requested_serializer_type,serializer_type,strlen(serializer_type))==0){
- manage_service_from_reference(svcRef, &svc,true);
- if(svc==NULL){
- printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef);
- status = CELIX_SERVICE_EXCEPTION;
- }
- *serSvc = svc;
- break;
- }
- }
- }
- }
- else if(requested_qos_type != NULL){ /* We got QoS specification that will determine the selected serializer */
- if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){
- bool ser_found = false;
- for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
- for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){
- svcRef = (service_reference_pt)arrayList_get(serializerList,j);
- char *serializer_type = NULL;
- get_serializer_type(svcRef, &serializer_type);
- if(serializer_type != NULL){
- if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
- manage_service_from_reference(svcRef, &svc,true);
- if(svc==NULL){
- printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef);
- status = CELIX_SERVICE_EXCEPTION;
- }
- else{
- *serSvc = svc;
- ser_found = true;
- printf("Selected %s serializer as best for QoS=%s\n",qos_sample_serializer_prio_list[i],QOS_TYPE_SAMPLE);
- }
- }
- }
- }
- }
- }
- else if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){
- bool ser_found = false;
- for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
- for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){
- svcRef = (service_reference_pt)arrayList_get(serializerList,j);
- char *serializer_type = NULL;
- get_serializer_type(svcRef, &serializer_type);
- if(serializer_type != NULL){
- if(strncmp(qos_control_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
- manage_service_from_reference(svcRef, &svc,true);
- if(svc==NULL){
- printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef);
- status = CELIX_SERVICE_EXCEPTION;
- }
- else{
- *serSvc = svc;
- ser_found = true;
- printf("Selected %s serializer as best for QoS=%s\n",qos_control_serializer_prio_list[i],QOS_TYPE_CONTROL);
- }
- }
- }
- }
- }
- }
- else{
- printf("Unknown QoS type '%s'\n",requested_qos_type);
- status = CELIX_ILLEGAL_ARGUMENT;
- }
- }
- else{ /* We got no specification: fallback to Qos=Sample, but count half the score */
- bool ser_found = false;
- for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
- for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){
- svcRef = (service_reference_pt)arrayList_get(serializerList,j);
- char *serializer_type = NULL;
- get_serializer_type(svcRef, &serializer_type);
- if(serializer_type != NULL){
- if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
- manage_service_from_reference(svcRef, &svc,true);
- if(svc==NULL){
- printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef);
- status = CELIX_SERVICE_EXCEPTION;
- }
- else{
- *serSvc = svc;
- ser_found = true;
- printf("Selected %s serializer as best without any specification\n",qos_sample_serializer_prio_list[i]);
- }
- }
- }
+ service_reference_pt best = NULL;
+ long hightestRanking = LONG_MIN;
+
+ if (requested_serializer_type != NULL) {
+ for (i = 0; i < arrayList_size(serializerList); ++i) {
+ svcRef = (service_reference_pt) arrayList_get(serializerList, 0);
+ const char* currentSerType = NULL;
+ serviceReference_getProperty(svcRef, PUBSUB_SERIALIZER_TYPE_KEY, ¤tSerType);
+ if (currentSerType != NULL && strncmp(requested_serializer_type, currentSerType, 128) == 0) {
+ best = svcRef;
+ break;
+ }
+ }
+ } else {
+ //no specific serializer request -> search for highest ranking serializer service
+ for (i = 0; i < arrayList_size(serializerList); ++i) {
+ svcRef = (service_reference_pt)arrayList_get(serializerList,0);
+ const char *service_ranking_str = NULL;
+ const char* currentSerType = NULL;
+ serviceReference_getProperty(svcRef, OSGI_FRAMEWORK_SERVICE_RANKING, &service_ranking_str);
+ serviceReference_getProperty(svcRef, PUBSUB_SERIALIZER_TYPE_KEY, ¤tSerType);
+ long svcRanking = service_ranking_str == NULL ? LONG_MIN : strtol(service_ranking_str, NULL, 10);
+ if (best == NULL || (svcRanking > hightestRanking && currentSerType != NULL)) {
+ best = svcRef;
+ hightestRanking = svcRanking;
+ }
+ if (currentSerType == NULL) {
+ fprintf(stderr, "Invalid pubsub_serializer service. Must have a property '%s'\n", PUBSUB_SERIALIZER_TYPE_KEY);
}
- }
- }
-
- if(svc!=NULL && svcRef!=NULL){
- manage_service_from_reference(svcRef, svc, false);
- }
-
- return status;
-}
+ }
+ }
-static void get_serializer_type(service_reference_pt svcRef, char **serializerType){
+ *out = best;
- const char *serType = NULL;
- serviceReference_getProperty(svcRef, PUBSUB_SERIALIZER_TYPE_KEY,&serType);
- if(serType != NULL){
- *serializerType = (char*)serType;
- }
- else{
- printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",svcRef);
- *serializerType = NULL;
- }
-}
-
-static void manage_service_from_reference(service_reference_pt svcRef, void **svc, bool getService){
- bundle_context_pt context = NULL;
- bundle_pt bundle = NULL;
- serviceReference_getBundle(svcRef, &bundle);
- bundle_getContext(bundle, &context);
- if(getService){
- bundleContext_getService(context, svcRef, svc);
- }
- else{
- bundleContext_ungetService(context, svcRef, NULL);
- }
-}
+ return status;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/src/pubsub_endpoint.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/src/pubsub_endpoint.c b/pubsub/pubsub_spi/src/pubsub_endpoint.c
index d3b746e..1950433 100644
--- a/pubsub/pubsub_spi/src/pubsub_endpoint.c
+++ b/pubsub/pubsub_spi/src/pubsub_endpoint.c
@@ -38,10 +38,11 @@
#include "pubsub_utils.h"
-static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props, bool cloneProps);
+static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long bundleId, long serviceId,const char* endpoint, const char *pubsubType, properties_pt topic_props);
static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const char *topic, bool isPublisher);
+static bool pubsubEndpoint_isEndpointValid(pubsub_endpoint_pt psEp);
-static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props, bool cloneProps){
+static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long bundleId, long serviceId, const char* endpoint, const char *pubsubType, properties_pt topic_props) {
if (psEp->endpoint_props == NULL) {
psEp->endpoint_props = properties_create();
@@ -52,33 +53,43 @@ static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID
uuid_t endpointUid;
uuid_generate(endpointUid);
uuid_unparse(endpointUid, endpointUuid);
- properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_ID, endpointUuid);
+
+ properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_UUID, endpointUuid);
if (fwUUID != NULL) {
- properties_set(psEp->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID, fwUUID);
+ properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID, fwUUID);
}
if (scope != NULL) {
- properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_SCOPE, scope);
+ properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE, scope);
}
if (topic != NULL) {
- properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC, topic);
+ properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME, topic);
}
- psEp->serviceID = serviceId;
+ char idBuf[32];
+
+ if (bundleId >= 0) {
+ snprintf(idBuf, sizeof(idBuf), "%li", bundleId);
+ properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_BUNDLE_ID, idBuf);
+ }
+
+ if (serviceId >= 0) {
+ snprintf(idBuf, sizeof(idBuf), "%li", bundleId);
+ properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_SERVICE_ID, idBuf);
+ }
if(endpoint != NULL) {
properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_URL, endpoint);
}
- if(topic_props != NULL){
- if(cloneProps){
- properties_copy(topic_props, &(psEp->topic_props));
- }
- else{
- psEp->topic_props = topic_props;
- }
+ if (pubsubType != NULL) {
+ properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_TYPE, pubsubType);
+ }
+
+ if(topic_props != NULL) {
+ properties_copy(topic_props, &(psEp->topic_props));
}
}
@@ -129,12 +140,22 @@ celix_status_t pubsubEndpoint_setField(pubsub_endpoint_pt ep, const char* key, c
return status;
}
-celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props,pubsub_endpoint_pt* psEp){
+celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long bundleId, long serviceId, const char* endpoint, const char* pubsubType, properties_pt topic_props,pubsub_endpoint_pt* out){
celix_status_t status = CELIX_SUCCESS;
- *psEp = calloc(1, sizeof(**psEp));
+ pubsub_endpoint_pt psEp = calloc(1, sizeof(*psEp));
- pubsubEndpoint_setFields(*psEp, fwUUID, scope, topic, serviceId, endpoint, topic_props, true);
+ pubsubEndpoint_setFields(psEp, fwUUID, scope, topic, bundleId, serviceId, endpoint, pubsubType, topic_props);
+
+ if (!pubsubEndpoint_isEndpointValid(psEp)) {
+ status = CELIX_ILLEGAL_STATE;
+ }
+
+ if (status == CELIX_SUCCESS) {
+ *out = psEp;
+ } else {
+ pubsubEndpoint_destroy(psEp);
+ }
return status;
@@ -151,9 +172,6 @@ celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt *o
status += properties_copy(in->topic_props, &(ep->topic_props));
}
- ep->serviceID = in->serviceID;
- ep->is_secure = in->is_secure;
-
if (status == CELIX_SUCCESS) {
*out = ep;
} else {
@@ -161,23 +179,18 @@ celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt *o
}
return status;
-
}
-celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference, pubsub_endpoint_pt* psEp, bool isPublisher){
+celix_status_t pubsubEndpoint_createFromServiceReference(bundle_context_t *ctx, service_reference_pt reference, bool isPublisher, pubsub_endpoint_pt* out){
celix_status_t status = CELIX_SUCCESS;
pubsub_endpoint_pt ep = calloc(1,sizeof(*ep));
- bundle_pt bundle = NULL;
- bundle_context_pt ctxt = NULL;
const char* fwUUID = NULL;
- serviceReference_getBundle(reference,&bundle);
- bundle_getContext(bundle,&ctxt);
- bundleContext_getProperty(ctxt,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+ bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
const char* scope = NULL;
- serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_SCOPE,&scope);
+ serviceReference_getPropertyWithDefault(reference, PUBSUB_SUBSCRIBER_SCOPE, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, &scope);
const char* topic = NULL;
serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_TOPIC,&topic);
@@ -185,70 +198,105 @@ celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt re
const char* serviceId = NULL;
serviceReference_getProperty(reference,(char*)OSGI_FRAMEWORK_SERVICE_ID,&serviceId);
+
+ long bundleId = -1;
+ bundle_pt bundle = NULL;
+ serviceReference_getBundle(reference, &bundle);
+ if (bundle != NULL) {
+ bundle_getBundleId(bundle, &bundleId);
+ }
+
/* TODO: is topic_props==NULL a fatal error such that EP cannot be created? */
properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, topic, isPublisher);
- pubsubEndpoint_setFields(ep, fwUUID, scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, strtol(serviceId,NULL,10), NULL, topic_props, false);
+ const char *pubsubType = isPublisher ? PUBSUB_PUBLISHER_ENDPOINT_TYPE : PUBSUB_SUBSCRIBER_ENDPOINT_TYPE;
- if (!properties_get(ep->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID) ||
- !ep->serviceID ||
- !properties_get(ep->endpoint_props, PUBSUB_ENDPOINT_SCOPE) ||
- !properties_get(ep->endpoint_props, PUBSUB_ENDPOINT_TOPIC)) {
+ pubsubEndpoint_setFields(ep, fwUUID, scope, topic, bundleId, strtol(serviceId,NULL,10), NULL, pubsubType, topic_props);
- fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "PUBSUB_ENDPOINT: incomplete description!.");
- status = CELIX_BUNDLE_EXCEPTION;
- pubsubEndpoint_destroy(ep);
- *psEp = NULL;
- }
- else{
- *psEp = ep;
- }
+ if (!pubsubEndpoint_isEndpointValid(ep)) {
+ status = CELIX_ILLEGAL_STATE;
+ }
+
+ if (status == CELIX_SUCCESS) {
+ *out = ep;
+ } else {
+ pubsubEndpoint_destroy(ep);
+ }
return status;
}
-celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt info,pubsub_endpoint_pt* psEp, bool isPublisher){
+celix_status_t pubsubEndpoint_createFromDiscoveredProperties(properties_t *discoveredProperties, pubsub_endpoint_pt* out) {
+ celix_status_t status = CELIX_SUCCESS;
+ pubsub_endpoint_pt psEp = calloc(1, sizeof(*psEp));
+ if (psEp != NULL) {
+ psEp->endpoint_props = discoveredProperties;
+ } else {
+ status = CELIX_ENOMEM;
+ }
+
+ if (!pubsubEndpoint_isEndpointValid(psEp)) {
+ status = CELIX_ILLEGAL_STATE;
+ }
+
+ if (status == CELIX_SUCCESS) {
+ *out = psEp;
+ } else {
+ pubsubEndpoint_destroy(psEp);
+ }
+
+ return status;
+}
+
+celix_status_t pubsubEndpoint_createFromListenerHookInfo(bundle_context_t *ctx, listener_hook_info_pt info, bool isPublisher, pubsub_endpoint_pt* out){
celix_status_t status = CELIX_SUCCESS;
const char* fwUUID=NULL;
- bundleContext_getProperty(info->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+ bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
- if(fwUUID==NULL){
+ if( fwUUID==NULL) {
return CELIX_BUNDLE_EXCEPTION;
}
- char* topic = pubsub_getTopicFromFilter(info->filter);
- if(topic==NULL){
+ const char* topic = NULL;
+ const char* scope = NULL;
+ pubsub_getPubSubInfoFromFilter(info->filter, &topic, &scope);
+
+ if (topic==NULL) {
return CELIX_BUNDLE_EXCEPTION;
}
-
- *psEp = calloc(1, sizeof(**psEp));
-
- char* scope = pubsub_getScopeFromFilter(info->filter);
if(scope == NULL) {
scope = strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT);
}
+ pubsub_endpoint_pt psEp = calloc(1, sizeof(**out));
+
bundle_pt bundle = NULL;
long bundleId = -1;
bundleContext_getBundle(info->context,&bundle);
-
bundle_getBundleId(bundle,&bundleId);
properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, topic, isPublisher);
/* TODO: is topic_props==NULL a fatal error such that EP cannot be created? */
- pubsubEndpoint_setFields(*psEp, fwUUID, scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, bundleId, NULL, topic_props, false);
+ pubsubEndpoint_setFields(psEp, fwUUID, scope, topic, bundleId, -1, NULL, PUBSUB_PUBLISHER_ENDPOINT_TYPE, topic_props);
- free(topic);
- free(scope);
+ if (!pubsubEndpoint_isEndpointValid(psEp)) {
+ status = CELIX_ILLEGAL_STATE;
+ }
+ if (status == CELIX_SUCCESS) {
+ *out = psEp;
+ } else {
+ pubsubEndpoint_destroy(psEp);
+ }
return status;
}
-celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){
+void pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){
+ if (psEp == NULL) return;
if(psEp->topic_props != NULL){
properties_destroy(psEp->topic_props);
@@ -260,23 +308,53 @@ celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){
free(psEp);
- return CELIX_SUCCESS;
+ return;
}
bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){
- return ((strcmp(properties_get(psEp1->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),properties_get(psEp2->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID))==0) &&
- (strcmp(properties_get(psEp1->endpoint_props, PUBSUB_ENDPOINT_SCOPE),properties_get(psEp2->endpoint_props, PUBSUB_ENDPOINT_SCOPE))==0) &&
- (strcmp(properties_get(psEp1->endpoint_props, PUBSUB_ENDPOINT_TOPIC),properties_get(psEp2->endpoint_props, PUBSUB_ENDPOINT_TOPIC))==0) &&
- (psEp1->serviceID == psEp2->serviceID) /*&&
- ((psEp1->endpoint==NULL && psEp2->endpoint==NULL)||(strcmp(psEp1->endpoint,psEp2->endpoint)==0))*/
- );
+ return strcmp(properties_get(psEp1->endpoint_props, PUBSUB_ENDPOINT_UUID),properties_get(psEp2->endpoint_props, PUBSUB_ENDPOINT_UUID));
}
-char *createScopeTopicKey(const char* scope, const char* topic) {
+char * pubsubEndpoint_createScopeTopicKey(const char* scope, const char* topic) {
char *result = NULL;
asprintf(&result, "%s:%s", scope, topic);
return result;
}
+
+
+static bool pubsubEndpoint_isEndpointValid(pubsub_endpoint_pt psEp) {
+ //required properties
+ bool valid = true;
+ static const char* keys[] = {
+ PUBSUB_ENDPOINT_UUID,
+ PUBSUB_ENDPOINT_FRAMEWORK_UUID,
+ PUBSUB_ENDPOINT_TYPE,
+ PUBSUB_ENDPOINT_TOPIC_NAME,
+ PUBSUB_ENDPOINT_TOPIC_SCOPE,
+ NULL };
+ int i;
+ for (i = 0; keys[i] != NULL; ++i) {
+ const char *val = properties_get(psEp->endpoint_props, keys[i]);
+ if (val == NULL) { //missing required key
+ fprintf(stderr, "[ERROR] PubSubEndpoint: Invalid endpoint missing key: '%s'\n", keys[i]);
+ valid = false;
+ }
+ }
+ if (!valid) {
+ const char *key = NULL;
+ fprintf(stderr, "PubSubEndpoint entries:\n");
+ PROPERTIES_FOR_EACH(psEp->endpoint_props, key) {
+ fprintf(stderr, "\t'%s' : '%s'\n", key, properties_get(psEp->endpoint_props, key));
+ }
+ if (psEp->topic_props != NULL) {
+ fprintf(stderr, "PubSubEndpoint topic properties entries:\n");
+ PROPERTIES_FOR_EACH(psEp->topic_props, key) {
+ fprintf(stderr, "\t'%s' : '%s'\n", key, properties_get(psEp->topic_props, key));
+ }
+ }
+ }
+ return valid;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/src/pubsub_utils.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/src/pubsub_utils.c b/pubsub/pubsub_spi/src/pubsub_utils.c
index 53bacb8..daf4e9f 100644
--- a/pubsub/pubsub_spi/src/pubsub_utils.c
+++ b/pubsub/pubsub_spi/src/pubsub_utils.c
@@ -42,85 +42,43 @@
#define MAX_KEYBUNDLE_LENGTH 256
-char* pubsub_getScopeFromFilter(const char* bundle_filter){
- char* scope = NULL;
-
- char* filter = strdup(bundle_filter);
-
- char* oc = strstr(filter,OSGI_FRAMEWORK_OBJECTCLASS);
- if(oc!=NULL){
- oc+=strlen(OSGI_FRAMEWORK_OBJECTCLASS)+1;
- if(strncmp(oc,PUBSUB_PUBLISHER_SERVICE_NAME,strlen(PUBSUB_PUBLISHER_SERVICE_NAME))==0){
-
- char* scopes = strstr(filter,PUBSUB_PUBLISHER_SCOPE);
- if(scopes!=NULL){
-
- scopes+=strlen(PUBSUB_PUBLISHER_SCOPE)+1;
- char* bottom=strchr(scopes,')');
- *bottom='\0';
-
- scope=strdup(scopes);
- } else {
- scope=strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT);
+celix_status_t pubsub_getPubSubInfoFromFilter(const char* filterstr, const char **topicOut, const char **scopeOut) {
+ celix_status_t status = CELIX_SUCCESS;
+ const char *topic = NULL;
+ const char *scope = NULL;
+ const char *objectClass = NULL;
+ celix_filter_t *filter = filter_create(filterstr);
+ if (filter != NULL) {
+ if (filter->operand == CELIX_FILTER_OPERAND_AND) { //only and pubsub filter valid (e.g. (&(objectClass=pubsub_publisher)(topic=exmpl))
+ array_list_t *attributes = filter->children;
+ unsigned int i;
+ unsigned int size = arrayList_size(attributes);
+ for (i = 0; i < size; ++i) {
+ filter_t *attr = arrayList_get(attributes, i);
+ if (attr->operand == CELIX_FILTER_OPERAND_EQUAL) {
+ if (strncmp(OSGI_FRAMEWORK_OBJECTCLASS, attr->attribute, 128) == 0) {
+ objectClass = attr->value;
+ } else if (strncmp(PUBSUB_PUBLISHER_TOPIC, attr->attribute, 128) == 0) {
+ topic = attr->value;
+ } else if (strncmp(PUBSUB_PUBLISHER_SCOPE, attr->attribute, 128) == 0) {
+ scope = attr->value;
+ }
+ }
}
}
}
- free(filter);
-
- return scope;
-}
-
-char* pubsub_getTopicFromFilter(const char* bundle_filter){
-
- char* topic = NULL;
-
- char* filter = strdup(bundle_filter);
-
- char* oc = strstr(filter,OSGI_FRAMEWORK_OBJECTCLASS);
- if(oc!=NULL){
- oc+=strlen(OSGI_FRAMEWORK_OBJECTCLASS)+1;
- if(strncmp(oc,PUBSUB_PUBLISHER_SERVICE_NAME,strlen(PUBSUB_PUBLISHER_SERVICE_NAME))==0){
-
- char* topics = strstr(filter,PUBSUB_PUBLISHER_TOPIC);
- if(topics!=NULL){
-
- topics+=strlen(PUBSUB_PUBLISHER_TOPIC)+1;
- char* bottom=strchr(topics,')');
- *bottom='\0';
-
- topic=strdup(topics);
-
- }
- }
+ if (topic != NULL && objectClass != NULL && strncmp(objectClass, PUBSUB_PUBLISHER_SERVICE_NAME, 128) == 0) {
+ *topicOut = topic;
+ *scopeOut = scope;
+ } else {
+ *topicOut = NULL;
+ *scopeOut = NULL;
}
-
- free(filter);
-
- return topic;
-
+ return status;
}
-array_list_pt pubsub_getTopicsFromString(const char* string){
-
- array_list_pt topic_list = NULL;
- arrayList_create(&topic_list);
-
- char* topics = strdup(string);
-
- char* topic = strtok(topics,",;|# ");
- arrayList_add(topic_list,strdup(topic));
-
- while( (topic = strtok(NULL,",;|# ")) !=NULL){
- arrayList_add(topic_list,strdup(topic));
- }
-
- free(topics);
-
- return topic_list;
-
-}
/**
* Loop through all bundles and look for the bundle with the keys inside.
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 71a9ad9..5b983d4 100644
--- a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -110,6 +110,13 @@ celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_help
(*manager)->shellCmdService.handle = *manager;
(*manager)->shellCmdService.executeCommand = shellCommand;
+ (*manager)->verbose = PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE;
+ const char *verboseStr = NULL;
+ bundleContext_getProperty(context, PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, &verboseStr);
+ if (verboseStr != NULL) {
+ (*manager)->verbose = strncasecmp("true", verboseStr, strlen("true")) == 0;
+ }
+
properties_pt shellProps = properties_create();
properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "ps_info");
properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "ps_info");
@@ -182,6 +189,8 @@ celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_
celixThreadMutex_lock(&manager->subscriptionsLock);
hash_map_iterator_pt subscriptionsIterator = hashMapIterator_create(manager->subscriptions);
+ //TODO FIXME no matching used, should only add unmatched subscribers ?
+ //NOTE this is a bug which occurs when psa are started after bundles that uses the PSA
while (hashMapIterator_hasNext(subscriptionsIterator)) {
array_list_pt sub_ep_list = hashMapIterator_nextValue(subscriptionsIterator);
for(i=0;i<arrayList_size(sub_ep_list);i++){
@@ -197,6 +206,8 @@ celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_
status = celixThreadMutex_lock(&manager->publicationsLock);
hash_map_iterator_pt publicationsIterator = hashMapIterator_create(manager->publications);
+ //TODO FIXME no matching used, should only add unmatched publications ?
+ //NOTE this is a bug which occurs when psa are started after bundles that uses the PSA
while (hashMapIterator_hasNext(publicationsIterator)) {
array_list_pt pub_ep_list = hashMapIterator_nextValue(publicationsIterator);
for(i=0;i<arrayList_size(pub_ep_list);i++){
@@ -252,7 +263,7 @@ celix_status_t pubsub_topologyManager_psaRemoved(void * handle, service_referenc
unsigned int i;
for(i=0;i<arrayList_size(pubEP_list);i++){
pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
- if(strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0){
+ if(strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){
disc->removePublisher(disc->handle,pubEP);
}
}
@@ -297,9 +308,9 @@ celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_ref
//subscriber_service_pt subscriber = (subscriber_service_pt)service;
pubsub_endpoint_pt sub = NULL;
- if(pubsubEndpoint_createFromServiceReference(reference,&sub,false) == CELIX_SUCCESS){
+ if(pubsubEndpoint_createFromServiceReference(manager->context, reference,false, &sub) == CELIX_SUCCESS){
celixThreadMutex_lock(&manager->subscriptionsLock);
- char *sub_key = createScopeTopicKey(properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ char *sub_key = pubsubEndpoint_createScopeTopicKey(properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key);
if(sub_list_by_topic==NULL){
@@ -319,13 +330,13 @@ celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_ref
for(j=0;j<arrayList_size(manager->psaList);j++){
pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
psa->matchEndpoint(psa->admin,sub,&score);
- if(score>best_score){ /* We have a new winner! */
+ if (score > best_score) { /* We have a new winner! */
best_score = score;
best_psa = psa;
}
}
- if(best_psa != NULL && best_score>0){
+ if (best_psa != NULL && best_score>0) {
best_psa->addSubscription(best_psa->admin,sub);
}
@@ -336,7 +347,7 @@ celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_ref
service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
publisher_endpoint_announce_pt disc = NULL;
bundleContext_getService(manager->context, disc_sr, (void**) &disc);
- disc->interestedInTopic(disc->handle, properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ disc->interestedInTopic(disc->handle, properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
bundleContext_ungetService(manager->context, disc_sr, NULL);
}
hashMapIterator_destroy(iter);
@@ -364,7 +375,7 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r
pubsub_topology_manager_pt manager = handle;
pubsub_endpoint_pt subcmp = NULL;
- if(pubsubEndpoint_createFromServiceReference(reference,&subcmp,false) == CELIX_SUCCESS){
+ if(pubsubEndpoint_createFromServiceReference(manager->context, reference, false, &subcmp) == CELIX_SUCCESS){
unsigned int j,k;
@@ -375,7 +386,7 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r
service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
publisher_endpoint_announce_pt disc = NULL;
bundleContext_getService(manager->context, disc_sr, (void**) &disc);
- disc->uninterestedInTopic(disc->handle, properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ disc->uninterestedInTopic(disc->handle, properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
bundleContext_ungetService(manager->context, disc_sr, NULL);
}
hashMapIterator_destroy(iter);
@@ -384,7 +395,7 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r
celixThreadMutex_lock(&manager->subscriptionsLock);
celixThreadMutex_lock(&manager->psaListLock);
- char *sub_key = createScopeTopicKey(properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_SCOPE),properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ char *sub_key = pubsubEndpoint_createScopeTopicKey(properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key);
free(sub_key);
if(sub_list_by_topic!=NULL){
@@ -404,7 +415,7 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r
if(arrayList_size(sub_list_by_topic)==0){
for(k=0;k<arrayList_size(manager->psaList);k++){
pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
- psa->closeAllSubscriptions(psa->admin, (char*) properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ psa->closeAllSubscriptions(psa->admin, (char*) properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
}
}
@@ -451,7 +462,7 @@ celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service
array_list_pt pubEP_list = (array_list_pt)hashMapIterator_nextValue(iter);
for(unsigned int i = 0; i < arrayList_size(pubEP_list); i++) {
pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
- if( (strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0) && (properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL)){
+ if( (strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0) && (properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL)){
status += disc->announcePublisher(disc->handle,pubEP);
}
}
@@ -469,7 +480,7 @@ celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service
for(i=0;i<arrayList_size(l);i++){
pubsub_endpoint_pt subEp = (pubsub_endpoint_pt)arrayList_get(l,i);
- disc->interestedInTopic(disc->handle, properties_get(subEp->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ disc->interestedInTopic(disc->handle, properties_get(subEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
}
}
hashMapIterator_destroy(iter);
@@ -517,10 +528,10 @@ celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_
listener_hook_info_pt info = arrayList_get(listeners, l_index);
pubsub_endpoint_pt pub = NULL;
- if(pubsubEndpoint_createFromListenerHookInfo(info, &pub, true) == CELIX_SUCCESS){
+ if(pubsubEndpoint_createFromListenerHookInfo(manager->context, info, true, &pub) == CELIX_SUCCESS){
celixThreadMutex_lock(&manager->publicationsLock);
- char *pub_key = createScopeTopicKey(properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
array_list_pt pub_list_by_topic = hashMap_get(manager->publications, pub_key);
if(pub_list_by_topic==NULL){
arrayList_create(&pub_list_by_topic);
@@ -546,7 +557,7 @@ celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_
}
}
- if(best_psa != NULL && best_score>0){
+ if (best_psa != NULL && best_score > 0) {
status = best_psa->addPublication(best_psa->admin,pub);
if(status==CELIX_SUCCESS){
celixThreadMutex_lock(&manager->discoveryListLock);
@@ -585,14 +596,14 @@ celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra
listener_hook_info_pt info = arrayList_get(listeners, l_index);
pubsub_endpoint_pt pubcmp = NULL;
- if(pubsubEndpoint_createFromListenerHookInfo(info,&pubcmp,true) == CELIX_SUCCESS){
+ if(pubsubEndpoint_createFromListenerHookInfo(manager->context, info, true, &pubcmp) == CELIX_SUCCESS){
unsigned int j,k;
celixThreadMutex_lock(&manager->psaListLock);
celixThreadMutex_lock(&manager->publicationsLock);
- char *pub_key = createScopeTopicKey(properties_get(pubcmp->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
if(pub_list_by_topic!=NULL){
for(j=0;j<arrayList_size(pub_list_by_topic);j++){
@@ -625,7 +636,7 @@ celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra
if(arrayList_size(pub_list_by_topic)==0){
for(k=0;k<arrayList_size(manager->psaList);k++){
pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
- psa->closeAllPublications(psa->admin, (char*) properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ psa->closeAllPublications(psa->admin, (char*) properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
}
}
@@ -651,16 +662,20 @@ celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra
celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_endpoint_pt pubEP){
celix_status_t status = CELIX_SUCCESS;
- printf("PSTM: New publisher discovered for topic %s [fwUUID=%s, ep=%s]\n",
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
- properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+ pubsub_topology_manager_pt manager = handle;
+
+ if (manager->verbose) {
+ printf("PSTM: New publisher discovered for topic %s [fwUUID=%s, ep=%s]\n",
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+ }
+
- pubsub_topology_manager_pt manager = handle;
celixThreadMutex_lock(&manager->psaListLock);
celixThreadMutex_lock(&manager->publicationsLock);
- char *pub_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
if(pub_list_by_topic==NULL){
@@ -672,7 +687,7 @@ celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_end
/* Shouldn't be any other duplicate, since it's filtered out by the discovery */
pubsub_endpoint_pt p = NULL;
pubsubEndpoint_clone(pubEP, &p);
- arrayList_add(pub_list_by_topic,p);
+ arrayList_add(pub_list_by_topic , p);
unsigned int j;
double score = 0;
@@ -681,14 +696,16 @@ celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_end
for(j=0;j<arrayList_size(manager->psaList);j++){
pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
- psa->matchEndpoint(psa->admin,p,&score);
- if(score>best_score){ /* We have a new winner! */
+ psa->matchEndpoint(psa->admin , p, &score);
+ if (score>best_score) { /* We have a new winner! */
best_score = score;
best_psa = psa;
}
}
- if(best_psa != NULL && best_score>0){
+ if(best_psa != NULL && best_score>0) {
+ //TODO FIXME this the same call as used by publisher of service trackers. This is confusing.
+ //remote discovered publication can be handle different.
best_psa->addPublication(best_psa->admin,p);
}
else{
@@ -703,20 +720,24 @@ celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_end
celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpoint_pt pubEP){
celix_status_t status = CELIX_SUCCESS;
- printf("PSTM: Publisher removed for topic %s [fwUUID=%s, ep=%s]\n",
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
- properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+ pubsub_topology_manager_pt manager = handle;
+
+ if (manager->verbose) {
+ printf("PSTM: Publisher removed for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID));
+ }
- pubsub_topology_manager_pt manager = handle;
celixThreadMutex_lock(&manager->psaListLock);
celixThreadMutex_lock(&manager->publicationsLock);
unsigned int i;
- char *pub_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
if(pub_list_by_topic==NULL){
- printf("PSTM: ERROR: Cannot find topic for known endpoint [%s,%s,%s]. Something is inconsistent.\n",pub_key,properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+ printf("PSTM: ERROR: Cannot find topic for known endpoint [%s,%s,%s]. Something is inconsistent.\n",pub_key,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
status = CELIX_ILLEGAL_STATE;
}
else{
@@ -744,7 +765,7 @@ celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpo
for(i=0;i<arrayList_size(manager->psaList);i++){
pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
- psa->closeAllPublications(psa->admin, (char*) properties_get(p->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) properties_get(p->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ psa->closeAllPublications(psa->admin, (char*) properties_get(p->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(p->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
}
}
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index cdcc651..769048d 100644
--- a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -37,6 +37,9 @@
#include "pubsub/publisher.h"
#include "pubsub/subscriber.h"
+ #define PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY "PUBSUB_TOPOLOGY_MANAGER_VERBOSE"
+#define PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE false
+
struct pubsub_topology_manager {
bundle_context_pt context;
@@ -58,6 +61,8 @@ struct pubsub_topology_manager {
log_helper_pt loghelper;
+
+ bool verbose;
};
typedef struct pubsub_topology_manager *pubsub_topology_manager_pt;
[2/3] celix git commit: Refactors the pubsub spi and fixes an issue
with pubsub endpoint matching.
Posted by pn...@apache.org.
Refactors the pubsub spi and fixes an issue with pubsub endpoint matching.
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/b8f13870
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/b8f13870
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/b8f13870
Branch: refs/heads/develop
Commit: b8f138708b71850a4802396e6793257bc95f5314
Parents: c314016
Author: Pepijn Noltes <pe...@gmail.com>
Authored: Tue Feb 20 13:30:36 2018 +0100
Committer: Pepijn Noltes <pe...@gmail.com>
Committed: Tue Feb 20 13:30:36 2018 +0100
----------------------------------------------------------------------
framework/include/service_reference.h | 3 +
framework/src/service_reference.c | 9 +-
pubsub/examples/CMakeLists.txt | 27 +-
.../pubsub_admin_udp_mc/src/pubsub_admin_impl.c | 246 ++++++++----
.../pubsub_admin_udp_mc/src/pubsub_admin_impl.h | 10 +-
.../src/pubsub_psa_udpmc_constants.h | 39 ++
.../pubsub_admin_udp_mc/src/topic_publication.c | 51 ++-
.../pubsub_admin_udp_mc/src/topic_publication.h | 4 +-
pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c | 279 ++++++++++----
pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h | 18 +-
.../src/pubsub_psa_zmq_constants.h | 48 +++
pubsub/pubsub_admin_zmq/src/topic_publication.c | 37 +-
pubsub/pubsub_admin_zmq/src/topic_publication.h | 2 +-
.../pubsub_admin_zmq/src/topic_subscription.c | 17 +-
.../pubsub_admin_zmq/src/topic_subscription.h | 2 +-
pubsub/pubsub_api/include/pubsub/publisher.h | 7 +-
pubsub/pubsub_api/include/pubsub/subscriber.h | 5 +-
pubsub/pubsub_discovery/src/etcd_watcher.c | 58 +--
pubsub/pubsub_discovery/src/etcd_writer.c | 54 ++-
pubsub/pubsub_discovery/src/psd_activator.c | 1 -
.../src/pubsub_discovery_impl.c | 109 +++++-
.../src/pubsub_discovery_impl.h | 11 +-
.../pubsub_serializer_json/src/ps_activator.c | 3 +-
.../include/publisher_endpoint_announce.h | 4 +
pubsub/pubsub_spi/include/pubsub_admin.h | 26 +-
pubsub/pubsub_spi/include/pubsub_admin_match.h | 17 +-
pubsub/pubsub_spi/include/pubsub_constants.h | 30 ++
pubsub/pubsub_spi/include/pubsub_endpoint.h | 46 ++-
pubsub/pubsub_spi/include/pubsub_serializer.h | 2 -
pubsub/pubsub_spi/include/pubsub_utils.h | 4 +-
pubsub/pubsub_spi/src/pubsub_admin_match.c | 383 ++++++-------------
pubsub/pubsub_spi/src/pubsub_endpoint.c | 204 +++++++---
pubsub/pubsub_spi/src/pubsub_utils.c | 100 ++---
.../src/pubsub_topology_manager.c | 93 +++--
.../src/pubsub_topology_manager.h | 5 +
35 files changed, 1159 insertions(+), 795 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/framework/include/service_reference.h
----------------------------------------------------------------------
diff --git a/framework/include/service_reference.h b/framework/include/service_reference.h
index bb263f5..3d84526 100644
--- a/framework/include/service_reference.h
+++ b/framework/include/service_reference.h
@@ -48,6 +48,9 @@ FRAMEWORK_EXPORT celix_status_t
serviceReference_getProperty(service_reference_pt reference, const char *key, const char **value);
FRAMEWORK_EXPORT celix_status_t
+serviceReference_getPropertyWithDefault(service_reference_pt reference, const char *key, const char* def, const char **value);
+
+FRAMEWORK_EXPORT celix_status_t
serviceReference_getPropertyKeys(service_reference_pt reference, char **keys[], unsigned int *size);
FRAMEWORK_EXPORT celix_status_t
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/framework/src/service_reference.c
----------------------------------------------------------------------
diff --git a/framework/src/service_reference.c b/framework/src/service_reference.c
index 545426d..e39e912 100644
--- a/framework/src/service_reference.c
+++ b/framework/src/service_reference.c
@@ -203,14 +203,15 @@ celix_status_t serviceReference_getServiceRegistration(service_reference_pt ref,
}
}
-celix_status_t serviceReference_getProperty(service_reference_pt ref, const char* key, const char** value) {
+FRAMEWORK_EXPORT celix_status_t
+serviceReference_getPropertyWithDefault(service_reference_pt ref, const char *key, const char* def, const char **value) {
celix_status_t status = CELIX_SUCCESS;
properties_pt props = NULL;
celixThreadRwlock_readLock(&ref->lock);
if (ref->registration != NULL) {
status = serviceRegistration_getProperties(ref->registration, &props);
if (status == CELIX_SUCCESS) {
- *value = (char*) properties_get(props, key);
+ *value = (char*) properties_getWithDefault(props, key, def);
}
} else {
*value = NULL;
@@ -219,6 +220,10 @@ celix_status_t serviceReference_getProperty(service_reference_pt ref, const char
return status;
}
+celix_status_t serviceReference_getProperty(service_reference_pt ref, const char* key, const char** value) {
+ return serviceReference_getPropertyWithDefault(ref, key, NULL, value);
+}
+
FRAMEWORK_EXPORT celix_status_t serviceReference_getPropertyKeys(service_reference_pt ref, char **keys[], unsigned int *size) {
celix_status_t status = CELIX_SUCCESS;
properties_pt props = NULL;
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/examples/CMakeLists.txt b/pubsub/examples/CMakeLists.txt
index 6703324..91f2efa 100644
--- a/pubsub/examples/CMakeLists.txt
+++ b/pubsub/examples/CMakeLists.txt
@@ -21,6 +21,12 @@ add_subdirectory(mp_pubsub)
find_program(ETCD_CMD NAMES etcd)
find_program(XTERM_CMD NAMES xterm)
+find_package(ZMQ REQUIRED)
+find_package(CZMQ REQUIRED)
+find_package(Jansson REQUIRED)
+
+set(PUBSUB_CONTAINER_LIBS ${JANSSON_LIBRARY} ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY} Celix::dfi)
+
# UDP Multicast
add_celix_container(pubsub_publisher_udp_mc
GROUP pubsub
@@ -34,6 +40,7 @@ add_celix_container(pubsub_publisher_udp_mc
celix_pubsub_poi_publisher
celix_pubsub_poi_publisher2
)
+target_link_libraries(pubsub_publisher_udp_mc PRIVATE ${PUBSUB_CONTAINER_LIBS})
add_celix_container("pubsub_subscriber_udp_mc"
GROUP "pubsub"
@@ -46,7 +53,7 @@ add_celix_container("pubsub_subscriber_udp_mc"
Celix::pubsub_admin_udp_multicast
celix_pubsub_poi_subscriber
)
-
+target_link_libraries(pubsub_subscriber_udp_mc PRIVATE ${PUBSUB_CONTAINER_LIBS})
add_celix_container("pubsub_subscriber2_udp_mc"
GROUP "pubsub"
BUNDLES
@@ -58,7 +65,7 @@ add_celix_container("pubsub_subscriber2_udp_mc"
Celix::pubsub_admin_udp_multicast
celix_pubsub_poi_subscriber
)
-
+target_link_libraries(pubsub_subscriber2_udp_mc PRIVATE ${PUBSUB_CONTAINER_LIBS})
if (ETCD_CMD AND XTERM_CMD)
#Runtime starting a publish and subscriber for udp mc
add_runtime(pubsub_rt_upd_mc
@@ -78,8 +85,8 @@ if (BUILD_PUBSUB_PSA_ZMQ)
# Dynamic ZMQ / UDP admin
add_celix_container("pubsub_publisher"
- GROUP "pubsub"
- BUNDLES
+ GROUP "pubsub"
+ BUNDLES
Celix::shell
Celix::shell_tui
Celix::pubsub_serializer_json
@@ -89,10 +96,11 @@ if (BUILD_PUBSUB_PSA_ZMQ)
Celix::pubsub_admin_udp_multicast
celix_pubsub_poi_publisher
celix_pubsub_poi_publisher2
- PROPERTIES
+ PROPERTIES
poi1.psa=zmq
poi2.psa=udp
- )
+ )
+ target_link_libraries(pubsub_publisher PRIVATE ${PUBSUB_CONTAINER_LIBS})
add_celix_container("pubsub_subscriber"
GROUP "pubsub"
@@ -109,6 +117,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
poi1.psa=zmq
poi2.psa=udp
)
+ target_link_libraries(pubsub_subscriber PRIVATE ${PUBSUB_CONTAINER_LIBS})
# ZMQ
add_celix_container("pubsub_zmq"
@@ -123,6 +132,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
celix_pubsub_poi_publisher
celix_pubsub_poi_subscriber
)
+ target_link_libraries(pubsub_zmq PRIVATE ${PUBSUB_CONTAINER_LIBS})
add_celix_container("pubsub_publisher_zmq"
GROUP "pubsub"
@@ -138,6 +148,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
PROPERTIES
pubsub.scope=my_small_scope
)
+ target_link_libraries(pubsub_publisher_zmq PRIVATE ${PUBSUB_CONTAINER_LIBS})
add_celix_container("pubsub_subscriber_zmq"
GROUP "pubsub"
@@ -150,6 +161,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
Celix::pubsub_admin_zmq
celix_pubsub_poi_subscriber
)
+ target_link_libraries(pubsub_subscriber_zmq PRIVATE ${PUBSUB_CONTAINER_LIBS})
add_celix_container("pubsub_subscriber2_zmq"
GROUP "pubsub"
@@ -163,6 +175,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
celix_pubsub_poi_subscriber
)
+ target_link_libraries(pubsub_subscriber2_zmq PRIVATE ${PUBSUB_CONTAINER_LIBS})
# ZMQ Multipart
add_celix_container("pubsub_mp_subscriber_zmq"
@@ -176,6 +189,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
Celix::pubsub_admin_zmq
org.apache.celix.pubsub_subscriber.MpSubscriber
)
+ target_link_libraries(pubsub_mp_subscriber_zmq PRIVATE ${PUBSUB_CONTAINER_LIBS})
add_celix_container("pubsub_mp_publisher_zmq"
GROUP "pubsub"
@@ -188,6 +202,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
Celix::pubsub_admin_zmq
org.apache.celix.pubsub_publisher.MpPublisher
)
+ target_link_libraries(pubsub_mp_publisher_zmq PRIVATE ${PUBSUB_CONTAINER_LIBS})
if (ETCD_CMD AND XTERM_CMD)
#Runtime starting two bundles using both zmq and upd mc pubsub
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
index a71344a..1e3cef0 100644
--- a/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
@@ -41,6 +41,7 @@
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
+#include <assert.h>
#include "constants.h"
#include "utils.h"
@@ -69,7 +70,7 @@ static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip)
static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
-static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc);
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin, pubsub_endpoint_pt ep, pubsub_serializer_service_t **out, const char **serType);
static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication);
static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication);
@@ -206,6 +207,34 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad
(*admin)->mcIpAddress = strdup(DEFAULT_MC_IP);
}
+ (*admin)->defaultScore = PSA_UDPMC_DEFAULT_SCORE;
+ (*admin)->qosSampleScore = PSA_UDPMC_DEFAULT_QOS_SAMPLE_SCORE;
+ (*admin)->qosControlScore = PSA_UDPMC_DEFAULT_QOS_CONTROL_SCORE;
+
+ const char *defaultScoreStr = NULL;
+ const char *sampleScoreStr = NULL;
+ const char *controlScoreStr = NULL;
+ bundleContext_getProperty(context, PSA_UDPMC_DEFAULT_SCORE_KEY, &defaultScoreStr);
+ bundleContext_getProperty(context, PSA_UDPMC_QOS_SAMPLE_SCORE_KEY, &sampleScoreStr);
+ bundleContext_getProperty(context, PSA_UDPMC_QOS_CONTROL_SCORE_KEY, &controlScoreStr);
+
+ if (defaultScoreStr != NULL) {
+ (*admin)->defaultScore = strtof(defaultScoreStr, NULL);
+ }
+ if (sampleScoreStr != NULL) {
+ (*admin)->qosSampleScore = strtof(sampleScoreStr, NULL);
+ }
+ if (controlScoreStr != NULL) {
+ (*admin)->qosControlScore = strtof(controlScoreStr, NULL);
+ }
+
+ (*admin)->verbose = PSA_UDPMC_DEFAULT_VERBOSE;
+ const char *verboseStr = NULL;
+ bundleContext_getProperty(context, PSA_UDPMC_VERBOSE_KEY, &verboseStr);
+ if (verboseStr != NULL) {
+ (*admin)->verbose = strncasecmp("true", verboseStr, strlen("true")) == 0;
+ }
+
return status;
}
@@ -307,12 +336,14 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
int i;
pubsub_serializer_service_t *best_serializer = NULL;
- if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){
+ if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer, NULL)) == CELIX_SUCCESS){
status = pubsub_topicSubscriptionCreate(admin->bundle_context, admin->ifIpAddress, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, &any_sub);
}
else{
- printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
- properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ if (admin->verbose) {
+ printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
+ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+ }
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
arrayList_add(admin->noSerializerSubscriptions,subEP);
@@ -381,13 +412,7 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
celix_status_t status = CELIX_SUCCESS;
- printf("PSA_UDP_MC: Received subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",
- properties_get(subEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
- subEP->serviceID,
- properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
- properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
-
- if(strcmp(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),PUBSUB_ANY_SUB_TOPIC)==0){
+ if(strcmp(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),PUBSUB_ANY_SUB_TOPIC)==0){
return pubsubAdmin_addAnySubscription(admin,subEP);
}
@@ -397,26 +422,27 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
celixThreadMutex_lock(&admin->localPublicationsLock);
celixThreadMutex_lock(&admin->externalPublicationsLock);
- char* scope_topic = createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
- if(factory==NULL && ext_pub_list==NULL){ //No (local or external) publishers yet for this topic
+ if (factory==NULL && ext_pub_list==NULL) { //No (local or external) publishers yet for this topic
pubsubAdmin_addSubscriptionToPendingList(admin,subEP);
- }
- else{
+ } else {
int i;
topic_subscription_pt subscription = hashMap_get(admin->subscriptions, scope_topic);
- if(subscription == NULL) {
+ if (subscription == NULL) {
pubsub_serializer_service_t *best_serializer = NULL;
- if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){
- status += pubsub_topicSubscriptionCreate(admin->bundle_context,admin->ifIpAddress, (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC), best_serializer, &subscription);
- }
- else{
- printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
- properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ const char *serType = NULL;
+ if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer, &serType)) == CELIX_SUCCESS){
+ status += pubsub_topicSubscriptionCreate(admin->bundle_context,admin->ifIpAddress, (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME), best_serializer, &subscription);
+ } else {
+ if (admin->verbose) {
+ printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
+ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+ }
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
arrayList_add(admin->noSerializerSubscriptions,subEP);
@@ -477,6 +503,19 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
celixThreadMutex_unlock(&admin->subscriptionsLock);
celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
+ if (admin->verbose) {
+ printf("PSA_UDPMC: Added subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+ printf("PSA_UDPMC: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+ properties_get(subEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+ properties_get(subEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
+ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+ printf("PSA_UDPMC: \t [endpoint url = %s]\n", properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+ }
+
return status;
}
@@ -484,13 +523,20 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
celix_status_t status = CELIX_SUCCESS;
- printf("PSA_UDP_MC: Removing subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",
- properties_get(subEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
- subEP->serviceID,
- properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
- properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
-
- char* scope_topic = createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ if (admin->verbose) {
+ printf("PSA_UDPMC: Removing subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+ printf("PSA_UDPMC: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+ properties_get(subEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+ properties_get(subEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
+ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+ printf("PSA_UDPMC: \t [endpoint url = %s]\n", properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+ }
+
+ char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
celixThreadMutex_lock(&admin->subscriptionsLock);
topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
@@ -522,22 +568,29 @@ celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpo
celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){
celix_status_t status = CELIX_SUCCESS;
- printf("PSA_UDP_MC: Received publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",
- properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
- pubEP->serviceID,
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ const char* fwUUID = NULL;
+ bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+ if(fwUUID==NULL){
+ printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n");
+ return CELIX_INVALID_BUNDLE_CONTEXT;
+ }
- const char* fwUUID = NULL;
+ const char *epFwUUID = properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID);
+ bool isOwn = strncmp(fwUUID, epFwUUID, 128) == 0;
- bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
- if(fwUUID==NULL){
- printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n");
- return CELIX_INVALID_BUNDLE_CONTEXT;
- }
- char* scope_topic = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ if (isOwn) {
+ //should be null, willl be set in this call
+ assert(properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY) == NULL);
+ assert(properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY) == NULL);
+ }
+
+ if (isOwn) {
+ properties_set(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY, PSA_UDPMC_PUBSUB_ADMIN_TYPE);
+ }
+
+ char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
- if ((strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), fwUUID) == 0) && (properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) == NULL)) {
+ if ((strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) && (properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) == NULL)) {
celixThreadMutex_lock(&admin->localPublicationsLock);
@@ -546,12 +599,15 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
if (factory == NULL) {
topic_publication_pt pub = NULL;
pubsub_serializer_service_t *best_serializer = NULL;
- if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer)) == CELIX_SUCCESS){
- status = pubsub_topicPublicationCreate(admin->sendSocket, pubEP, best_serializer, admin->mcIpAddress, &pub);
- }
- else{
+ const char* serType = NULL;
+ if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer, &serType)) == CELIX_SUCCESS){
+ status = pubsub_topicPublicationCreate(admin->sendSocket, pubEP, best_serializer, serType, admin->mcIpAddress, &pub);
+ if (isOwn) {
+ properties_set(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY, serType);
+ }
+ } else {
printf("PSA_UDP_MC: Cannot find a serializer for publishing topic %s. Adding it to pending list.\n",
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
arrayList_add(admin->noSerializerPublications,pubEP);
@@ -565,10 +621,10 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
connectTopicPubSubToSerializer(admin, best_serializer, pub, true);
}
} else {
- printf("PSA_UDP_MC: Cannot create a topicPublication for scope=%s, topic=%s (bundle %ld).\n",
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
- pubEP->serviceID);
+ printf("PSA_UDP_MC: Cannot create a topicPublication for scope=%s, topic=%s (bundle %s).\n",
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
+ properties_get(pubEP->endpoint_props, PUBSUB_BUNDLE_ID));
}
} else {
//just add the new EP to the list
@@ -577,8 +633,7 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
}
celixThreadMutex_unlock(&admin->localPublicationsLock);
- }
- else{
+ } else {
celixThreadMutex_lock(&admin->externalPublicationsLock);
array_list_pt ext_pub_list = (array_list_pt) hashMap_get(admin->externalPublications, scope_topic);
@@ -630,6 +685,21 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
celixThreadMutex_unlock(&admin->subscriptionsLock);
+ if (admin->verbose) {
+ printf("PSA_UDPMC: Added publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+ printf("PSA_UDPMC: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+ properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+ properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+ printf("PSA_UDPMC: \t [endpoint url = %s, own = %i]\n",
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL),
+ isOwn);
+ }
+
return status;
}
@@ -638,11 +708,18 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
celix_status_t status = CELIX_SUCCESS;
int count = 0;
- printf("PSA_UDP_MC: Removing publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",
- properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
- pubEP->serviceID,
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ if (admin->verbose) {
+ printf("PSA_UDPMC: Adding publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+ printf("PSA_UDPMC: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+ properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+ properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+ printf("PSA_UDPMC: \t [endpoint url = %s]\n", properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+ }
const char* fwUUID = NULL;
@@ -651,9 +728,9 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n");
return CELIX_INVALID_BUNDLE_CONTEXT;
}
- char *scope_topic = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ char *scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
- if(strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0){
+ if(strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){
celixThreadMutex_lock(&admin->localPublicationsLock);
service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
@@ -735,7 +812,7 @@ celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char *scop
printf("PSA_UDP_MC: Closing all publications for scope=%s,topic=%s\n", scope, topic);
celixThreadMutex_lock(&admin->localPublicationsLock);
- char* scope_topic =createScopeTopicKey(scope, topic);
+ char* scope_topic = pubsubEndpoint_createScopeTopicKey(scope, topic);
hash_map_entry_pt pubsvc_entry = (hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic);
if(pubsvc_entry!=NULL){
char* key = (char*)hashMapEntry_getKey(pubsvc_entry);
@@ -761,7 +838,7 @@ celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char *sco
printf("PSA_UDP_MC: Closing all subscriptions\n");
celixThreadMutex_lock(&admin->subscriptionsLock);
- char* scope_topic =createScopeTopicKey(scope, topic);
+ char* scope_topic = pubsubEndpoint_createScopeTopicKey(scope, topic);
hash_map_entry_pt sub_entry = (hash_map_entry_pt)hashMap_getEntry(admin->subscriptions,scope_topic);
if(sub_entry!=NULL){
char* topic = (char*)hashMapEntry_getKey(sub_entry);
@@ -818,7 +895,7 @@ static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip)
static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
celix_status_t status = CELIX_SUCCESS;
- char* scope_topic =createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
array_list_pt pendingListPerTopic = hashMap_get(admin->pendingSubscriptions,scope_topic);
if(pendingListPerTopic==NULL){
arrayList_create(&pendingListPerTopic);
@@ -841,7 +918,7 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r
const char *serType = NULL;
serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType);
if(serType == NULL){
- printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference);
+ printf("PSA_UDPMC: Serializer serviceReference %p has no %s property specified\n",reference, PUBSUB_SERIALIZER_TYPE_KEY);
return CELIX_SERVICE_EXCEPTION;
}
@@ -856,7 +933,7 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r
for(i=0;i<arrayList_size(admin->noSerializerSubscriptions);i++){
pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i);
pubsub_serializer_service_t *best_serializer = NULL;
- pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
+ pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, NULL);
if(best_serializer != NULL){ /* Finally we have a valid serializer! */
pubsubAdmin_addSubscription(admin, ep);
}
@@ -865,7 +942,7 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r
for(i=0;i<arrayList_size(admin->noSerializerPublications);i++){
pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i);
pubsub_serializer_service_t *best_serializer = NULL;
- pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
+ pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, NULL);
if(best_serializer != NULL){ /* Finally we have a valid serializer! */
pubsubAdmin_addPublication(admin, ep);
}
@@ -873,7 +950,9 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
- printf("PSA_UDP_MC: %s serializer added\n",serType);
+ if (admin->verbose) {
+ printf("PSA_UDP_MC: %s serializer added\n", serType);
+ }
return status;
}
@@ -886,7 +965,7 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt
serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType);
if(serType == NULL){
- printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference);
+ printf("Serializer serviceReference %p has no %s property specified\n",reference, PUBSUB_SERIALIZER_TYPE_KEY);
return CELIX_SERVICE_EXCEPTION;
}
@@ -998,7 +1077,9 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt
arrayList_destroy(topicSubList);
}
- printf("PSA_UDP_MC: %s serializer removed\n",serType);
+ if (admin->verbose) {
+ printf("PSA_UDP_MC: %s serializer removed\n", serType);
+ }
return CELIX_SUCCESS;
@@ -1007,24 +1088,41 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt
celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score){
celix_status_t status = CELIX_SUCCESS;
+ const char *fwUuid = NULL;
+ bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUuid);
+ if (fwUuid == NULL) {
+ return CELIX_ILLEGAL_STATE;
+ }
+
celixThreadMutex_lock(&admin->serializerListLock);
- status = pubsub_admin_match(endpoint->topic_props,PUBSUB_ADMIN_TYPE,admin->serializerList,score);
+ status = pubsub_admin_match(endpoint, PSA_UDPMC_PUBSUB_ADMIN_TYPE, fwUuid, admin->qosSampleScore, admin->qosControlScore, admin->defaultScore, admin->serializerList,score);
celixThreadMutex_unlock(&admin->serializerListLock);
return status;
}
/* This one recall the same logic as in the match function */
-static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc){
-
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin, pubsub_endpoint_pt ep, pubsub_serializer_service_t **out, const char **serType){
celix_status_t status = CELIX_SUCCESS;
+ pubsub_serializer_service_t *serSvc = NULL;
+ service_reference_pt svcRef = NULL;
+
celixThreadMutex_lock(&admin->serializerListLock);
- status = pubsub_admin_get_best_serializer(ep->topic_props, admin->serializerList, serSvc);
+ status = pubsub_admin_get_best_serializer(ep->topic_props, admin->serializerList, &svcRef);
celixThreadMutex_unlock(&admin->serializerListLock);
- return status;
+ if (svcRef != NULL) {
+ bundleContext_getService(admin->bundle_context, svcRef, (void**)&serSvc);
+ bundleContext_ungetService(admin->bundle_context, svcRef, NULL); //TODO, FIXME this should not be done this way. only unget if the service is not used any more
+ if (serType != NULL) {
+ serviceReference_getProperty(svcRef, PUBSUB_SERIALIZER_TYPE_KEY, serType);
+ }
+ }
+ *out = serSvc;
+
+ return status;
}
static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication){
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h b/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
index de4b813..3529a8f 100644
--- a/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
+++ b/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
@@ -27,11 +27,10 @@
#ifndef PUBSUB_ADMIN_UDP_MC_IMPL_H_
#define PUBSUB_ADMIN_UDP_MC_IMPL_H_
+#include "pubsub_psa_udpmc_constants.h"
#include "pubsub_admin.h"
#include "log_helper.h"
-#define PUBSUB_ADMIN_TYPE "udp_mc"
-
struct pubsub_admin {
bundle_context_pt bundle_context;
@@ -68,8 +67,13 @@ struct pubsub_admin {
char* mcIpAddress; // The multicast IP address
int sendSocket;
- void* zmq_context; // to be removed
+
+ double qosSampleScore;
+ double qosControlScore;
+ double defaultScore;
+
+ bool verbose;
};
celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin);
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h b/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h
new file mode 100644
index 0000000..2a02da8
--- /dev/null
+++ b/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h
@@ -0,0 +1,39 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef PUBSUB_PSA_UDPMC_CONSTANTS_H_
+#define PUBSUB_PSA_UDPMC_CONSTANTS_H_
+
+
+#define PSA_UDPMC_PUBSUB_ADMIN_TYPE "udp_mc"
+
+#define PSA_UDPMC_DEFAULT_QOS_SAMPLE_SCORE 70
+#define PSA_UDPMC_DEFAULT_QOS_CONTROL_SCORE 30
+#define PSA_UDPMC_DEFAULT_SCORE 50
+
+#define PSA_UDPMC_QOS_SAMPLE_SCORE_KEY "PSA_UDPMC_QOS_SAMPLE_SCORE"
+#define PSA_UDPMC_QOS_CONTROL_SCORE_KEY "PSA_UDPMC_QOS_CONTROL_SCORE"
+#define PSA_UDPMC_DEFAULT_SCORE_KEY "PSA_UDPMC_DEFAULT_SCORE"
+
+#define PSA_UDPMC_DEFAULT_VERBOSE false
+
+#define PSA_UDPMC_VERBOSE_KEY "PSA_UDPMC_VERBOSE"
+
+
+#endif /* PUBSUB_PSA_UDPMC_CONSTANTS_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/src/topic_publication.c b/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
index 3aa2c30..7e9bdbb 100644
--- a/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
+++ b/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
@@ -40,6 +40,7 @@
#include "large_udp.h"
#include "pubsub_serializer.h"
+#include "pubsub_psa_udpmc_constants.h"
#define EP_ADDRESS_LEN 32
@@ -52,7 +53,10 @@ struct topic_publication {
array_list_pt pub_ep_list; //List<pubsub_endpoint>
hash_map_pt boundServices; //<bundle_pt,bound_service>
celix_thread_mutex_t tp_lock;
- pubsub_serializer_service_t *serializer;
+ struct {
+ const char* type;
+ pubsub_serializer_service_t* svc;
+ } serializer;
struct sockaddr_in destAddr;
};
@@ -92,11 +96,14 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsig
static void delay_first_send_for_late_joiners(void);
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* bindIP, topic_publication_pt *out){
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, const char* best_serializer_type, char* bindIP, topic_publication_pt *out){
char* ep = malloc(EP_ADDRESS_LEN);
memset(ep,0,EP_ADDRESS_LEN);
- unsigned int port = pubEP->serviceID + rand_range(UDP_BASE_PORT+pubEP->serviceID+3, UDP_MAX_PORT);
+
+ long serviceId =strtol(properties_getWithDefault(pubEP->endpoint_props, PUBSUB_ENDPOINT_SERVICE_ID, "0"), NULL, 10);
+
+ unsigned int port = serviceId + rand_range(UDP_BASE_PORT+serviceId+3, UDP_MAX_PORT);
snprintf(ep,EP_ADDRESS_LEN,"udp://%s:%u",bindIP,port);
@@ -112,9 +119,10 @@ celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt
pub->destAddr.sin_addr.s_addr = inet_addr(bindIP);
pub->destAddr.sin_port = htons(port);
- pub->serializer = best_serializer;
+ pub->serializer.type = best_serializer_type;
+ pub->serializer.svc = best_serializer;
- pubsub_topicPublicationAddPublisherEP(pub,pubEP);
+ pubsub_topicPublicationAddPublisherEP(pub, pubEP);
*out = pub;
@@ -138,7 +146,8 @@ celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
hashMap_destroy(pub->boundServices,false,false);
pub->svcFactoryReg = NULL;
- pub->serializer = NULL;
+ pub->serializer.svc= NULL;
+ pub->serializer.type= NULL;
if(close(pub->sendSocket) != 0){
status = CELIX_FILE_IO_EXCEPTION;
@@ -167,8 +176,8 @@ celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,top
factory->ungetService = pubsub_topicPublicationUngetService;
properties_pt props = properties_create();
- properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE));
- properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE));
+ properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
properties_set(props,"service.version", PUBSUB_PUBLISHER_SERVICE_VERSION);
@@ -176,10 +185,10 @@ celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,top
if(status != CELIX_SUCCESS){
properties_destroy(props);
- printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot register ServiceFactory for topic %s, topic %s (bundle %ld).\n",
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
- pubEP->serviceID);
+ printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot register ServiceFactory for topic %s, topic %s (bundle %s).\n",
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_BUNDLE_ID));
}
else{
*svcFactory = factory;
@@ -197,17 +206,19 @@ celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
return serviceRegistration_unregister(pub->svcFactoryReg);
}
-celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){
+celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, pubsub_endpoint_pt ep) {
celixThreadMutex_lock(&(pub->tp_lock));
pubsubEndpoint_setField(ep, PUBSUB_ENDPOINT_URL, pub->endpoint);
+ pubsubEndpoint_setField(ep, PUBSUB_ADMIN_TYPE_KEY, PSA_UDPMC_PUBSUB_ADMIN_TYPE);
+ pubsubEndpoint_setField(ep, PUBSUB_SERIALIZER_TYPE_KEY, pub->serializer.type);
arrayList_add(pub->pub_ep_list,ep);
celixThreadMutex_unlock(&(pub->tp_lock));
return CELIX_SUCCESS;
}
-celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){
+celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep) {
celixThreadMutex_lock(&(pub->tp_lock));
arrayList_removeElement(pub->pub_ep_list,ep);
@@ -386,13 +397,13 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
bound->getCount = 1;
celixThreadMutex_create(&bound->mp_lock,NULL);
- if(tp->serializer != NULL){
- tp->serializer->createSerializerMap(tp->serializer->handle,bundle,&bound->msgTypes);
+ if (tp->serializer.svc != NULL){
+ tp->serializer.svc->createSerializerMap(tp->serializer.svc->handle,bundle,&bound->msgTypes);
}
pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
- bound->scope=strdup(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE));
- bound->topic=strdup(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ bound->scope=strdup(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE));
+ bound->topic=strdup(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
bound->largeUdpHandle = largeUdp_create(1);
bound->service.handle = bound;
@@ -409,8 +420,8 @@ static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service
celixThreadMutex_lock(&boundSvc->mp_lock);
- if(boundSvc->parent->serializer != NULL && boundSvc->msgTypes != NULL){
- boundSvc->parent->serializer->destroySerializerMap(boundSvc->parent->serializer->handle, boundSvc->msgTypes);
+ if(boundSvc->parent->serializer.svc != NULL && boundSvc->msgTypes != NULL){
+ boundSvc->parent->serializer.svc->destroySerializerMap(boundSvc->parent->serializer.svc->handle, boundSvc->msgTypes);
}
if(boundSvc->scope!=NULL){
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_admin_udp_mc/src/topic_publication.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/src/topic_publication.h b/pubsub/pubsub_admin_udp_mc/src/topic_publication.h
index 8f47deb..e0a5698 100644
--- a/pubsub/pubsub_admin_udp_mc/src/topic_publication.h
+++ b/pubsub/pubsub_admin_udp_mc/src/topic_publication.h
@@ -43,10 +43,10 @@ typedef struct pubsub_udp_msg {
} pubsub_udp_msg_t;
typedef struct topic_publication *topic_publication_pt;
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* bindIP, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, const char* best_serializer_type, char* bindIP, topic_publication_pt *out);
celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
-celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
+celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, pubsub_endpoint_pt ep);
celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
index 9929437..1451d92 100644
--- a/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
@@ -70,7 +70,7 @@ static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip);
static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
-static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc);
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **svcOut, const char **serTypeOut);
static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication);
static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication);
@@ -165,8 +165,6 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad
(*admin)->maxPort = PSA_ZMQ_DEFAULT_MAX_PORT;
}
- printf("PSA Using base port %u to max port %u\n", (*admin)->basePort, (*admin)->maxPort);
-
// Disable Signal Handling by CZMQ
setenv("ZSYS_SIGHANDLER", "false", true);
@@ -201,6 +199,39 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad
}
+
+ (*admin)->defaultScore = PSA_ZMQ_DEFAULT_SCORE;
+ (*admin)->qosSampleScore = PSA_ZMQ_DEFAULT_QOS_SAMPLE_SCORE;
+ (*admin)->qosControlScore = PSA_ZMQ_DEFAULT_QOS_CONTROL_SCORE;
+
+ const char *defaultScoreStr = NULL;
+ const char *sampleScoreStr = NULL;
+ const char *controlScoreStr = NULL;
+ bundleContext_getProperty(context, PSA_ZMQ_DEFAULT_SCORE_KEY, &defaultScoreStr);
+ bundleContext_getProperty(context, PSA_ZMQ_QOS_SAMPLE_SCORE_KEY, &sampleScoreStr);
+ bundleContext_getProperty(context, PSA_ZMQ_QOS_CONTROL_SCORE_KEY, &controlScoreStr);
+
+ if (defaultScoreStr != NULL) {
+ (*admin)->defaultScore = strtof(defaultScoreStr, NULL);
+ }
+ if (sampleScoreStr != NULL) {
+ (*admin)->qosSampleScore = strtof(sampleScoreStr, NULL);
+ }
+ if (controlScoreStr != NULL) {
+ (*admin)->qosControlScore = strtof(controlScoreStr, NULL);
+ }
+
+ (*admin)->verbose = PSA_ZMQ_DEFAULT_VERBOSE;
+ const char *verboseStr = NULL;
+ bundleContext_getProperty(context, PSA_ZMQ_VERBOSE_KEY, &verboseStr);
+ if (verboseStr != NULL) {
+ (*admin)->verbose = strncasecmp("true", verboseStr, strlen("true")) == 0;
+ }
+
+ if ((*admin)->verbose) {
+ printf("PSA ZMQ Using base port %u to max port %u\n", (*admin)->basePort, (*admin)->maxPort);
+ }
+
return status;
}
@@ -302,18 +333,19 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
celixThreadMutex_lock(&admin->subscriptionsLock);
- topic_subscription_pt any_sub = hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
+ topic_subscription_pt any_sub = hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
if(any_sub==NULL){
int i;
pubsub_serializer_service_t *best_serializer = NULL;
- if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){
- status = pubsub_topicSubscriptionCreate(admin->bundle_context, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, &any_sub);
+ const char *serType = NULL;
+ if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer, &serType)) == CELIX_SUCCESS){
+ status = pubsub_topicSubscriptionCreate(admin->bundle_context, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, serType, &any_sub);
}
else{
printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
- properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
arrayList_add(admin->noSerializerSubscriptions,subEP);
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
@@ -378,16 +410,15 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
return status;
}
+/**
+ * A subcriber service is registered and this PSA had won the match
+ * (based on qos or other meta data)
+ * Will update the pubsub endpoint with the chosen pubsub admin and serializer type
+ */
celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
celix_status_t status = CELIX_SUCCESS;
- printf("PSA_ZMQ: Received subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",
- properties_get(subEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
- subEP->serviceID,
- properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
- properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
-
- if(strcmp(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),PUBSUB_ANY_SUB_TOPIC)==0){
+ if(strcmp(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME), PUBSUB_ANY_SUB_TOPIC)==0) {
return pubsubAdmin_addAnySubscription(admin,subEP);
}
@@ -397,7 +428,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
celixThreadMutex_lock(&admin->localPublicationsLock);
celixThreadMutex_lock(&admin->externalPublicationsLock);
- char* scope_topic = createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
@@ -411,12 +442,13 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
if(subscription == NULL) {
pubsub_serializer_service_t *best_serializer = NULL;
- if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){
- status += pubsub_topicSubscriptionCreate(admin->bundle_context, (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC), best_serializer, &subscription);
+ const char *serType = NULL;
+ if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer, &serType)) == CELIX_SUCCESS){
+ status += pubsub_topicSubscriptionCreate(admin->bundle_context, (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME), best_serializer, serType, &subscription);
}
else{
printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
- properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
arrayList_add(admin->noSerializerSubscriptions,subEP);
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
@@ -476,6 +508,20 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
celixThreadMutex_unlock(&admin->subscriptionsLock);
celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
+
+ if (admin->verbose) {
+ printf("PSA_ZMQ: Added subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+ printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+ properties_get(subEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+ properties_get(subEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
+ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+ printf("PSA_ZMQ: \t [endpoint url = %s]\n", properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+ }
+
return status;
}
@@ -483,12 +529,20 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
celix_status_t status = CELIX_SUCCESS;
- printf("PSA_ZMQ: Removing subscription [FWUUID=%s bundleID=%ld topic=%s]\n",
- properties_get(subEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
- subEP->serviceID,
- properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
-
- char* scope_topic = createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ if (admin->verbose) {
+ printf("PSA_ZMQ: Removing subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+ printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+ properties_get(subEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+ properties_get(subEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
+ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+ printf("PSA_ZMQ: \t [endpoint url = %s]\n", properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+ }
+
+ char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
celixThreadMutex_lock(&admin->subscriptionsLock);
topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
@@ -517,26 +571,45 @@ celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpo
}
+
+/**
+ * A bundle has shown interested in a publisher service and this PSA had won the match
+ * based on filter or embedded topic.properties (extender pattern)
+ * OR !!
+ * A remote publication has been discovered and forwarded to this call
+ * Will update the pubsub endpoint with the chosen pubsub admin and serializer type
+ */
celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP) {
- celix_status_t status = CELIX_SUCCESS;
+ celix_status_t status = CELIX_SUCCESS;
- printf("PSA_ZMQ: Received publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",
- properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
- pubEP->serviceID,
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ const char *fwUUID = NULL;
+ bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
+ if (fwUUID == NULL) {
+ printf("PSA_ZMQ: Cannot retrieve fwUUID.\n");
+ return CELIX_INVALID_BUNDLE_CONTEXT;
+ }
- const char* fwUUID = NULL;
+ const char *epFwUUID = properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID);
+ bool isOwn = strncmp(fwUUID, epFwUUID, 128) == 0;
- bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
- if (fwUUID == NULL) {
- printf("PSA_ZMQ: Cannot retrieve fwUUID.\n");
- return CELIX_INVALID_BUNDLE_CONTEXT;
- }
+ if (isOwn) {
+ //should be null, willl be set in this call
+ assert(properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY) == NULL);
+ assert(properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY) == NULL);
+ } else {
+ //inverse ADMIN_TYPE_KEY and SERIALIZER_TYPE shoudl not be null
+ assert(properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY) != NULL);
+ assert(properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY) != NULL);
+ }
+
+ if (isOwn) {
+ properties_set(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY, PSA_ZMQ_PUBSUB_ADMIN_TYPE);
+ }
- char *scope_topic = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
- if ((strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), fwUUID) == 0) &&
+ char *scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+
+ if ((strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) &&
(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) == NULL)) {
celixThreadMutex_lock(&admin->localPublicationsLock);
@@ -546,12 +619,16 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint
if (factory == NULL) {
topic_publication_pt pub = NULL;
pubsub_serializer_service_t *best_serializer = NULL;
- if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer)) == CELIX_SUCCESS){
- status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, best_serializer, admin->ipAddress, admin->basePort, admin->maxPort, &pub);
+ const char *serType = NULL;
+ if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer, &serType)) == CELIX_SUCCESS){
+ status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, best_serializer, serType, admin->ipAddress, admin->basePort, admin->maxPort, &pub);
+ if (isOwn) {
+ properties_set(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY, serType);
+ }
}
- else{
+ else {
printf("PSA_ZMQ: Cannot find a serializer for publishing topic %s. Adding it to pending list.\n",
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
arrayList_add(admin->noSerializerPublications,pubEP);
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
@@ -564,10 +641,10 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint
connectTopicPubSubToSerializer(admin, best_serializer, pub, true);
}
} else {
- printf("PSA_ZMQ: Cannot create a topicPublication for scope=%s, topic=%s (bundle %ld).\n",
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
- pubEP->serviceID);
+ printf("PSA_ZMQ: Cannot create a topicPublication for scope=%s, topic=%s (bundle %s).\n",
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
+ properties_get(pubEP->endpoint_props, PUBSUB_BUNDLE_ID));
}
} else {
//just add the new EP to the list
@@ -629,7 +706,24 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint
celixThreadMutex_unlock(&admin->subscriptionsLock);
- return status;
+
+ if (admin->verbose) {
+ printf("PSA_ZMQ: Added publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+ printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+ properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+ properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+ printf("PSA_UDPMC: \t [endpoint url = %s, own = %i]\n",
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL),
+ isOwn);
+ }
+
+
+ return status;
}
@@ -637,21 +731,29 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
celix_status_t status = CELIX_SUCCESS;
int count = 0;
- printf("PSA_ZMQ: Removing publication [FWUUID=%s bundleID=%ld topic=%s]\n",
- properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
- pubEP->serviceID,
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ if (admin->verbose) {
+ printf("PSA_ZMQ: Removing publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+ printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+ properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+ properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+ printf("PSA_ZMQ: \t [endpoint url = %s]\n", properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+ }
const char* fwUUID = NULL;
bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
if(fwUUID==NULL){
- printf("PSA_ZMQ: Cannot retrieve fwUUID.\n");
+ fprintf(stderr, "ERROR PSA_ZMQ: Cannot retrieve fwUUID.\n");
return CELIX_INVALID_BUNDLE_CONTEXT;
}
- char *scope_topic = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ char *scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
- if(strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0){
+ if(strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){
celixThreadMutex_lock(&admin->localPublicationsLock);
service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
@@ -729,10 +831,12 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin, char *scope, char* topic){
celix_status_t status = CELIX_SUCCESS;
- printf("PSA_ZMQ: Closing all publications\n");
+ if (admin->verbose) {
+ printf("PSA_ZMQ: Closing all publications\n");
+ }
celixThreadMutex_lock(&admin->localPublicationsLock);
- char *scope_topic = createScopeTopicKey(scope, topic);
+ char *scope_topic = pubsubEndpoint_createScopeTopicKey(scope, topic);
hash_map_entry_pt pubsvc_entry = (hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic);
if(pubsvc_entry!=NULL){
char* key = (char*)hashMapEntry_getKey(pubsvc_entry);
@@ -755,10 +859,12 @@ celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin, char *sco
celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope,char* topic){
celix_status_t status = CELIX_SUCCESS;
- printf("PSA_ZMQ: Closing all subscriptions\n");
+ if (admin->verbose) {
+ printf("PSA_ZMQ: Closing all subscriptions\n");
+ }
celixThreadMutex_lock(&admin->subscriptionsLock);
- char *scope_topic = createScopeTopicKey(scope, topic);
+ char *scope_topic = pubsubEndpoint_createScopeTopicKey(scope, topic);
hash_map_entry_pt sub_entry = (hash_map_entry_pt)hashMap_getEntry(admin->subscriptions,scope_topic);
if(sub_entry!=NULL){
char* topic = (char*)hashMapEntry_getKey(sub_entry);
@@ -814,7 +920,7 @@ static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip)
static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
celix_status_t status = CELIX_SUCCESS;
- char* scope_topic = createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
array_list_pt pendingListPerTopic = hashMap_get(admin->pendingSubscriptions,scope_topic);
if(pendingListPerTopic==NULL){
arrayList_create(&pendingListPerTopic);
@@ -834,8 +940,8 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r
const char *serType = NULL;
serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType);
- if(serType == NULL){
- printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference);
+ if (serType == NULL) {
+ fprintf(stderr, "WARNING PSA ZMQ: Serializer serviceReference %p has no %s property specified\n", reference, PUBSUB_SERIALIZER_TYPE_KEY);
return CELIX_SERVICE_EXCEPTION;
}
@@ -850,7 +956,7 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r
for(i=0;i<arrayList_size(admin->noSerializerSubscriptions);i++){
pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i);
pubsub_serializer_service_t *best_serializer = NULL;
- pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
+ pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, NULL);
if(best_serializer != NULL){ /* Finally we have a valid serializer! */
pubsubAdmin_addSubscription(admin, ep);
}
@@ -859,7 +965,7 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r
for(i=0;i<arrayList_size(admin->noSerializerPublications);i++){
pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i);
pubsub_serializer_service_t *best_serializer = NULL;
- pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
+ pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, NULL);
if(best_serializer != NULL){ /* Finally we have a valid serializer! */
pubsubAdmin_addPublication(admin, ep);
}
@@ -867,7 +973,9 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
- printf("PSA_ZMQ: %s serializer added\n",serType);
+ if (admin->verbose) {
+ printf("PSA_ZMQ: %s serializer added\n", serType);
+ }
return status;
}
@@ -879,8 +987,8 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt
const char *serType = NULL;
serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType);
- if(serType == NULL){
- printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference);
+ if (serType == NULL) {
+ printf("WARNING PSA ZMQ: Serializer serviceReference %p has no %s property specified\n", reference, PUBSUB_SERIALIZER_TYPE_KEY);
return CELIX_SERVICE_EXCEPTION;
}
@@ -994,9 +1102,9 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt
}
-
- printf("PSA_ZMQ: %s serializer removed\n",serType);
-
+ if (admin->verbose) {
+ printf("PSA_ZMQ: %s serializer removed\n", serType);
+ }
return CELIX_SUCCESS;
}
@@ -1004,24 +1112,42 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt
celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score){
celix_status_t status = CELIX_SUCCESS;
+ const char *fwUuid = NULL;
+ bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUuid);
+ if (fwUuid == NULL) {
+ return CELIX_ILLEGAL_STATE;
+ }
+
celixThreadMutex_lock(&admin->serializerListLock);
- status = pubsub_admin_match(endpoint->topic_props,PUBSUB_ADMIN_TYPE,admin->serializerList,score);
+ status = pubsub_admin_match(endpoint, PSA_ZMQ_PUBSUB_ADMIN_TYPE, fwUuid, admin->qosSampleScore, admin->qosControlScore, admin->defaultScore, admin->serializerList, score);
celixThreadMutex_unlock(&admin->serializerListLock);
return status;
}
/* This one recall the same logic as in the match function */
-static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc){
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **svcOut, const char **serTypeOut) {
+ celix_status_t status = CELIX_SUCCESS;
- celix_status_t status = CELIX_SUCCESS;
+ pubsub_serializer_service_t *serSvc = NULL;
+ service_reference_pt svcRef = NULL;
- celixThreadMutex_lock(&admin->serializerListLock);
- status = pubsub_admin_get_best_serializer(ep->topic_props, admin->serializerList, serSvc);
- celixThreadMutex_unlock(&admin->serializerListLock);
+ celixThreadMutex_lock(&admin->serializerListLock);
+ status = pubsub_admin_get_best_serializer(ep->topic_props, admin->serializerList, &svcRef);
+ celixThreadMutex_unlock(&admin->serializerListLock);
+
+ if (svcRef != NULL) {
+ bundleContext_getService(admin->bundle_context, svcRef, (void**)&serSvc);
+ bundleContext_ungetService(admin->bundle_context, svcRef, NULL); //TODO, FIXME this should not be done this way. only unget if the service is not used any more
+ if (serTypeOut != NULL) {
+ serviceReference_getProperty(svcRef, PUBSUB_SERIALIZER_TYPE_KEY, serTypeOut);
+ }
+ }
- return status;
+ *svcOut = serSvc;
+
+ return status;
}
static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication){
@@ -1057,3 +1183,4 @@ static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topi
celixThreadMutex_unlock(&admin->usedSerializersLock);
}
+
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h b/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h
index 040a0d3..c788382 100644
--- a/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h
+++ b/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h
@@ -37,18 +37,12 @@
#undef LOG_INFO
#undef LOG_WARNING
+#include "pubsub_psa_zmq_constants.h"
#include "pubsub_admin.h"
#include "pubsub_admin_match.h"
#include "log_helper.h"
#include "command.h"
-#define PSA_ZMQ_BASE_PORT "PSA_ZMQ_BASE_PORT"
-#define PSA_ZMQ_MAX_PORT "PSA_ZMQ_MAX_PORT"
-
-#define PSA_ZMQ_DEFAULT_BASE_PORT 5501
-#define PSA_ZMQ_DEFAULT_MAX_PORT 6000
-
-#define PUBSUB_ADMIN_TYPE "zmq"
struct pubsub_admin {
@@ -56,8 +50,8 @@ struct pubsub_admin {
log_helper_pt loghelper;
/* List of the available serializers */
- celix_thread_mutex_t serializerListLock; // List<serializers>
- array_list_pt serializerList;
+ celix_thread_mutex_t serializerListLock;
+ array_list_pt serializerList; // List<serializers service references>
celix_thread_mutex_t localPublicationsLock;
hash_map_pt localPublications;//<topic(string),service_factory_pt>
@@ -91,6 +85,12 @@ struct pubsub_admin {
command_service_t shellCmdService;
service_registration_pt shellCmdReg;
+
+ double qosSampleScore;
+ double qosControlScore;
+ double defaultScore;
+
+ bool verbose;
};
celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin);
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h b/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
new file mode 100644
index 0000000..211439e
--- /dev/null
+++ b/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
@@ -0,0 +1,48 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+
+#ifndef PUBSUB_PSA_ZMQ_CONSTANTS_H_
+#define PUBSUB_PSA_ZMQ_CONSTANTS_H_
+
+
+
+#define PSA_ZMQ_PUBSUB_ADMIN_TYPE "zmq"
+
+#define PSA_ZMQ_BASE_PORT "PSA_ZMQ_BASE_PORT"
+#define PSA_ZMQ_MAX_PORT "PSA_ZMQ_MAX_PORT"
+
+#define PSA_ZMQ_DEFAULT_BASE_PORT 5501
+#define PSA_ZMQ_DEFAULT_MAX_PORT 6000
+
+#define PSA_ZMQ_DEFAULT_QOS_SAMPLE_SCORE 30
+#define PSA_ZMQ_DEFAULT_QOS_CONTROL_SCORE 70
+#define PSA_ZMQ_DEFAULT_SCORE 30
+
+#define PSA_ZMQ_QOS_SAMPLE_SCORE_KEY "PSA_ZMQ_QOS_SAMPLE_SCORE"
+#define PSA_ZMQ_QOS_CONTROL_SCORE_KEY "PSA_ZMQ_QOS_CONTROL_SCORE"
+#define PSA_ZMQ_DEFAULT_SCORE_KEY "PSA_ZMQ_DEFAULT_SCORE"
+
+#define PSA_ZMQ_DEFAULT_VERBOSE false
+#define PSA_ZMQ_VERBOSE_KEY "PSA_ZMQ_VERBOSE"
+
+
+
+#endif /* PUBSUB_PSA_ZMQ_CONSTANTS_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_admin_zmq/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/src/topic_publication.c b/pubsub/pubsub_admin_zmq/src/topic_publication.c
index 873cec2..c81107d 100644
--- a/pubsub/pubsub_admin_zmq/src/topic_publication.c
+++ b/pubsub/pubsub_admin_zmq/src/topic_publication.c
@@ -45,6 +45,7 @@
#include "topic_publication.h"
#include "pubsub_serializer.h"
+#include "pubsub_psa_zmq_constants.h"
#ifdef BUILD_WITH_ZMQ_SECURITY
#include "zmq_crypto.h"
@@ -65,7 +66,10 @@ struct topic_publication {
service_registration_pt svcFactoryReg;
array_list_pt pub_ep_list; //List<pubsub_endpoint>
hash_map_pt boundServices; //<bundle_pt,bound_service>
- pubsub_serializer_service_t *serializer;
+ struct {
+ const char* type;
+ pubsub_serializer_service_t *svc;
+ } serializer;
celix_thread_mutex_t tp_lock;
};
@@ -109,7 +113,7 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsig
static void delay_first_send_for_late_joiners(void);
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, const char* serType, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
celix_status_t status = CELIX_SUCCESS;
#ifdef BUILD_WITH_ZMQ_SECURITY
@@ -214,7 +218,8 @@ celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, p
pub->endpoint = ep;
pub->zmq_socket = socket;
- pub->serializer = best_serializer;
+ pub->serializer.svc = best_serializer;
+ pub->serializer.type = serType;
celixThreadMutex_create(&(pub->socket_lock),NULL);
@@ -248,7 +253,8 @@ celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
hashMap_destroy(pub->boundServices,false,false);
pub->svcFactoryReg = NULL;
- pub->serializer = NULL;
+ pub->serializer.svc = NULL;
+ pub->serializer.type = NULL;
#ifdef BUILD_WITH_ZMQ_SECURITY
zcert_destroy(&(pub->zmq_cert));
#endif
@@ -282,16 +288,17 @@ celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,top
factory->ungetService = pubsub_topicPublicationUngetService;
properties_pt props = properties_create();
- properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
- properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE));
+ properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+ properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE));
properties_set(props,"service.version", PUBSUB_PUBLISHER_SERVICE_VERSION);
status = bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg));
if(status != CELIX_SUCCESS){
properties_destroy(props);
- printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot register ServiceFactory for topic %s (bundle %ld).\n",
- properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),pubEP->serviceID);
+ printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot register ServiceFactory for topic %s (bundle %s).\n",
+ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
+ properties_get(pubEP->endpoint_props, PUBSUB_BUNDLE_ID));
}
else{
*svcFactory = factory;
@@ -309,9 +316,11 @@ celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
return serviceRegistration_unregister(pub->svcFactoryReg);
}
-celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){
+celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, pubsub_endpoint_pt ep) {
celixThreadMutex_lock(&(pub->tp_lock));
+ pubsubEndpoint_setField(ep, PUBSUB_ADMIN_TYPE_KEY, PSA_ZMQ_PUBSUB_ADMIN_TYPE);
+ pubsubEndpoint_setField(ep, PUBSUB_SERIALIZER_TYPE_KEY, pub->serializer.type);
pubsubEndpoint_setField(ep, PUBSUB_ENDPOINT_URL, pub->endpoint);
arrayList_add(pub->pub_ep_list,ep);
celixThreadMutex_unlock(&(pub->tp_lock));
@@ -574,14 +583,14 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
bound->mp_send_in_progress = false;
celixThreadMutex_create(&bound->mp_lock,NULL);
- if(tp->serializer != NULL){
- tp->serializer->createSerializerMap(tp->serializer->handle,bundle,&bound->msgTypes);
+ if(tp->serializer.svc != NULL){
+ tp->serializer.svc->createSerializerMap(tp->serializer.svc->handle,bundle,&bound->msgTypes);
}
arrayList_create(&bound->mp_parts);
pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
- bound->topic=strdup(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+ bound->topic=strdup(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
bound->service.handle = bound;
bound->service.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID;
@@ -600,8 +609,8 @@ static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service
celixThreadMutex_lock(&boundSvc->mp_lock);
- if(boundSvc->parent->serializer != NULL && boundSvc->msgTypes != NULL){
- boundSvc->parent->serializer->destroySerializerMap(boundSvc->parent->serializer->handle, boundSvc->msgTypes);
+ if(boundSvc->parent->serializer.svc != NULL && boundSvc->msgTypes != NULL){
+ boundSvc->parent->serializer.svc->destroySerializerMap(boundSvc->parent->serializer.svc->handle, boundSvc->msgTypes);
}
if(boundSvc->mp_parts!=NULL){
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_admin_zmq/src/topic_publication.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/src/topic_publication.h b/pubsub/pubsub_admin_zmq/src/topic_publication.h
index 65df0e3..20e4a8e 100644
--- a/pubsub/pubsub_admin_zmq/src/topic_publication.h
+++ b/pubsub/pubsub_admin_zmq/src/topic_publication.h
@@ -35,7 +35,7 @@
typedef struct topic_publication *topic_publication_pt;
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, const char *serType, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out);
celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_admin_zmq/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/src/topic_subscription.c b/pubsub/pubsub_admin_zmq/src/topic_subscription.c
index 8ff94d0..46a1688 100644
--- a/pubsub/pubsub_admin_zmq/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_zmq/src/topic_subscription.c
@@ -75,7 +75,10 @@ struct topic_subscription{
celix_thread_mutex_t ts_lock;
bundle_context_pt context;
- pubsub_serializer_service_t *serializer;
+ struct {
+ const char* type;
+ pubsub_serializer_service_t *svc;
+ } serializer;
hash_map_pt servicesMap; // key = service, value = msg types map
@@ -134,7 +137,7 @@ static unsigned int get_zmq_receive_timeout(bundle_context_pt context) {
return timeout;
}
-celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, char* scope, char* topic, pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out){
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, char* scope, char* topic, pubsub_serializer_service_t *best_serializer, const char *serType, topic_subscription_pt* out){
celix_status_t status = CELIX_SUCCESS;
#ifdef BUILD_WITH_ZMQ_SECURITY
@@ -202,7 +205,7 @@ celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,
ts->zmq_socket = zmq_s;
ts->running = false;
ts->nrSubscribers = 0;
- ts->serializer = best_serializer;
+ ts->serializer.svc = best_serializer;
ts->zmqReceiveTimeout = get_zmq_receive_timeout(bundle_context);
#ifdef BUILD_WITH_ZMQ_SECURITY
ts->zmq_cert = sub_cert;
@@ -419,8 +422,8 @@ static celix_status_t topicsub_subscriberTracked(void * handle, service_referenc
serviceReference_getBundle(reference, &bundle);
- if(ts->serializer != NULL && bundle!=NULL){
- ts->serializer->createSerializerMap(ts->serializer->handle,bundle,&msgTypes);
+ if(ts->serializer.svc != NULL && bundle!=NULL){
+ ts->serializer.svc->createSerializerMap(ts->serializer.svc->handle,bundle,&msgTypes);
if(msgTypes != NULL){
hashMap_put(ts->servicesMap, service, msgTypes);
printf("PSA_ZMQ_TS: New subscriber registered.\n");
@@ -443,8 +446,8 @@ static celix_status_t topicsub_subscriberUntracked(void * handle, service_refere
celixThreadMutex_lock(&ts->ts_lock);
if (hashMap_containsKey(ts->servicesMap, service)) {
hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service);
- if(msgTypes!=NULL && ts->serializer!=NULL){
- ts->serializer->destroySerializerMap(ts->serializer->handle,msgTypes);
+ if(msgTypes!=NULL && ts->serializer.svc!=NULL){
+ ts->serializer.svc->destroySerializerMap(ts->serializer.svc->handle,msgTypes);
printf("PSA_ZMQ_TS: Subscriber unregistered.\n");
}
else{
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_admin_zmq/src/topic_subscription.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/src/topic_subscription.h b/pubsub/pubsub_admin_zmq/src/topic_subscription.h
index 7267103..6dca4e5 100644
--- a/pubsub/pubsub_admin_zmq/src/topic_subscription.h
+++ b/pubsub/pubsub_admin_zmq/src/topic_subscription.h
@@ -38,7 +38,7 @@
typedef struct topic_subscription* topic_subscription_pt;
-celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,char* scope, char* topic, pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out);
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,char* scope, char* topic, pubsub_serializer_service_t *best_serializer, const char* serType, topic_subscription_pt* out);
celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts);
celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts);
celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts);
[3/3] celix git commit: Merge branch 'develop' of
https://git-wip-us.apache.org/repos/asf/celix into develop
Posted by pn...@apache.org.
Merge branch 'develop' of https://git-wip-us.apache.org/repos/asf/celix into develop
This merge favors the refactored version of pubsub_endpoints.c and pubsub_utils.c
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/109edf4d
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/109edf4d
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/109edf4d
Branch: refs/heads/develop
Commit: 109edf4d08c20a120f04d733bab8b4a563c88d60
Parents: b8f1387 a55fd15
Author: Pepijn Noltes <pe...@gmail.com>
Authored: Tue Feb 20 13:38:38 2018 +0100
Committer: Pepijn Noltes <pe...@gmail.com>
Committed: Tue Feb 20 13:38:38 2018 +0100
----------------------------------------------------------------------
etcdlib/test/etcdlib_test.c | 126 +++++++++++++-----------
pubsub/pubsub_discovery/src/etcd_watcher.c | 2 +-
pubsub/pubsub_spi/src/pubsub_endpoint.c | 2 +-
pubsub/test/CMakeLists.txt | 32 +++---
4 files changed, 87 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/109edf4d/pubsub/pubsub_discovery/src/etcd_watcher.c
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/109edf4d/pubsub/pubsub_spi/src/pubsub_endpoint.c
----------------------------------------------------------------------
diff --cc pubsub/pubsub_spi/src/pubsub_endpoint.c
index 1950433,11041f2..24123b7
--- a/pubsub/pubsub_spi/src/pubsub_endpoint.c
+++ b/pubsub/pubsub_spi/src/pubsub_endpoint.c
@@@ -323,38 -285,3 +323,38 @@@ char * pubsubEndpoint_createScopeTopicK
return result;
}
+
+
+static bool pubsubEndpoint_isEndpointValid(pubsub_endpoint_pt psEp) {
+ //required properties
+ bool valid = true;
+ static const char* keys[] = {
+ PUBSUB_ENDPOINT_UUID,
+ PUBSUB_ENDPOINT_FRAMEWORK_UUID,
+ PUBSUB_ENDPOINT_TYPE,
+ PUBSUB_ENDPOINT_TOPIC_NAME,
+ PUBSUB_ENDPOINT_TOPIC_SCOPE,
+ NULL };
+ int i;
+ for (i = 0; keys[i] != NULL; ++i) {
+ const char *val = properties_get(psEp->endpoint_props, keys[i]);
+ if (val == NULL) { //missing required key
+ fprintf(stderr, "[ERROR] PubSubEndpoint: Invalid endpoint missing key: '%s'\n", keys[i]);
+ valid = false;
+ }
+ }
+ if (!valid) {
+ const char *key = NULL;
+ fprintf(stderr, "PubSubEndpoint entries:\n");
+ PROPERTIES_FOR_EACH(psEp->endpoint_props, key) {
+ fprintf(stderr, "\t'%s' : '%s'\n", key, properties_get(psEp->endpoint_props, key));
+ }
+ if (psEp->topic_props != NULL) {
+ fprintf(stderr, "PubSubEndpoint topic properties entries:\n");
+ PROPERTIES_FOR_EACH(psEp->topic_props, key) {
+ fprintf(stderr, "\t'%s' : '%s'\n", key, properties_get(psEp->topic_props, key));
+ }
+ }
+ }
+ return valid;
- }
++}