You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/10/12 09:03:27 UTC
[10/34] celix git commit: CELIX-454: More PubSub. The UDPMC is now
somewhat working again, but still needs some testing.
CELIX-454: More PubSub. The UDPMC is now somewhat working again, but still needs some testing.
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/2eb219e3
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/2eb219e3
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/2eb219e3
Branch: refs/heads/develop
Commit: 2eb219e393cfafd47a99739da3e56e0ab882799a
Parents: 69596cf
Author: Pepijn Noltes <pe...@gmail.com>
Authored: Wed Sep 26 21:13:50 2018 +0200
Committer: Pepijn Noltes <pe...@gmail.com>
Committed: Wed Sep 26 21:13:50 2018 +0200
----------------------------------------------------------------------
.../pubsub/pubsub_admin_udp_mc/CMakeLists.txt | 2 +
.../pubsub_admin_udp_mc/src/psa_activator.c | 6 +-
.../src/pubsub_udpmc_admin.c | 295 +++++++++-
.../src/pubsub_udpmc_admin.h | 47 +-
.../src/pubsub_udpmc_common.c | 40 ++
.../src/pubsub_udpmc_common.h | 39 ++
.../src/pubsub_udpmc_topic_receiver.c | 450 +++++++++++++++
.../src/pubsub_udpmc_topic_receiver.h | 45 ++
.../src/pubsub_udpmc_topic_sender.c | 305 +++++++++-
.../src/pubsub_udpmc_topic_sender.h | 10 +-
.../pubsub_admin_udp_mc/src/topic_publication.c | 2 +-
.../pubsub_admin_zmq/src/topic_subscription.c | 2 +-
.../pubsub/pubsub_discovery/src/psd_activator.c | 19 +-
.../src/pubsub_discovery_impl.c | 32 +-
.../src/pubsub_discovery_impl.h | 8 +-
.../pubsub/pubsub_spi/include/pubsub_admin.h | 2 +-
.../pubsub/pubsub_spi/include/pubsub_common.h | 2 +-
.../pubsub/pubsub_spi/include/pubsub_utils.h | 5 +-
bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c | 9 +-
.../pubsub/pubsub_spi/src/pubsub_utils_match.c | 33 +-
.../src/pubsub_topology_manager.c | 565 +++++++++++--------
.../src/pubsub_topology_manager.h | 15 +-
libs/framework/include/celix_bundle_context.h | 2 +-
23 files changed, 1576 insertions(+), 359 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt b/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
index 94ce5cc..3f74376 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
@@ -25,6 +25,8 @@ add_celix_bundle(celix_pubsub_admin_udp_multicast
src/psa_activator.c
src/pubsub_udpmc_admin.c
src/pubsub_udpmc_topic_sender.c
+ src/pubsub_udpmc_topic_receiver.c
+ src/pubsub_udpmc_common.c
#src/pubsub_admin_impl.c
#src/topic_subscription.c
#src/topic_publication.c
http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
index 406720e..682efc5 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
@@ -59,8 +59,8 @@ int psa_udpmc_start(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) {
psaSvc->matchEndpoint = pubsub_udpmcAdmin_matchEndpoint;
psaSvc->setupTopicSender = pubsub_udpmcAdmin_setupTopicSender;
psaSvc->teardownTopicSender = pubsub_udpmcAdmin_teardownTopicSender;
- psaSvc->setupTopicReciever = pubsub_udpmcAdmin_setupTopicReciever;
- psaSvc->teardownTopicReciever = pubsub_udpmcAdmin_teardownTopicReciever;
+ psaSvc->setupTopicReciever = pubsub_udpmcAdmin_setupTopicReceiver;
+ psaSvc->teardownTopicReciever = pubsub_udpmcAdmin_teardownTopicReceiver;
psaSvc->addEndpoint = pubsub_udpmcAdmin_addEndpoint;
psaSvc->removeEndpoint = pubsub_udpmcAdmin_removeEndpoint;
@@ -78,7 +78,7 @@ int psa_udpmc_start(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) {
celix_properties_set(props, OSGI_SHELL_COMMAND_NAME, "psa_udpmc");
celix_properties_set(props, OSGI_SHELL_COMMAND_USAGE, "psa_udpmc");
celix_properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the UDPMC PSA");
- celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props);
+ act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props);
}
return status;
http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
index 650e439..3e06334 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
@@ -1,3 +1,21 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
#include <memory.h>
#include <sys/socket.h>
@@ -11,8 +29,11 @@
#include "pubsub_udpmc_admin.h"
#include "pubsub_psa_udpmc_constants.h"
#include "pubsub_udpmc_topic_sender.h"
+#include "pubsub_udpmc_topic_receiver.h"
#define PUBSUB_UDPMC_MC_IP_DEFAULT "224.100.1.1"
+#define PUBSUB_UDPMC_SOCKET_ADDRESS_KEY "udpmc.socket_address"
+#define PUBSUB_UDPMC_SOCKET_PORT_KEY "udpmc.socker_port"
#define LOG_DEBUG(...) \
logHelper_log(psa->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
@@ -23,8 +44,46 @@
#define LOG_ERROR(...) \
logHelper_log(psa->log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+struct pubsub_udpmc_admin {
+ celix_bundle_context_t *ctx;
+ log_helper_t *log;
+ char *ifIpAddress; // The local interface which is used for multicast communication
+ char *mcIpAddress; // The multicast IP address
+ int sendSocket;
+ double qosSampleScore;
+ double qosControlScore;
+ double defaultScore;
+ bool verbose;
+ const char *fwUUID;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = scope:topic key, value = pubsub_udpmc_topic_sender_t
+ } topicSenders;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = scope:topic key, value = pubsub_udpmc_topic_sender_t
+ } topicReceivers;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = endpoint uuid, value = psa_udpmc_connected_endpoint_entry_t
+ } discoveredEndpoints;
+
+};
+
+typedef struct psa_udpmc_connected_endpoint {
+ void *sender; //if connected endpoint is subscriber. todo type
+ void *receiver; //if connected endpoint is publisher. TODO type
+ char *endpointUUID;
+} psa_udpmc_connected_endpoint_t;
+
static celix_status_t udpmc_getIpAddress(const char* interface, char** ip);
+static celix_status_t pubsub_udpmcAdmin_connectEndpointToReceiver(pubsub_udpmc_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint);
+static celix_status_t pubsub_udpmcAdmin_disconnectEndpointFromReceiver(pubsub_udpmc_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint);
+
pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper) {
pubsub_udpmc_admin_t *psa = calloc(1, sizeof(*psa));
@@ -115,8 +174,8 @@ pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_
celixThreadMutex_create(&psa->topicReceivers.mutex, NULL);
psa->topicReceivers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
- celixThreadMutex_create(&psa->connectedEndpoints.mutex, NULL);
- psa->connectedEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ celixThreadMutex_create(&psa->discoveredEndpoints.mutex, NULL);
+ psa->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
return psa;
}
@@ -128,14 +187,38 @@ void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa) {
//note assuming al psa register services and service tracker are removed.
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_updmc_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
+ pubsub_udpmcTopicSender_destroy(sender);
+ }
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ iter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_updmc_topic_receiver_t *recv = hashMapIterator_nextValue(&iter);
+ pubsub_udpmcTopicReceiver_destroy(recv);
+ }
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+ celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+ iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ celix_properties_t *ep = hashMapIterator_nextValue(&iter);
+ celix_properties_destroy(ep);
+ }
+ celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+
celixThreadMutex_destroy(&psa->topicSenders.mutex);
hashMap_destroy(psa->topicSenders.map, true, false);
celixThreadMutex_destroy(&psa->topicReceivers.mutex);
hashMap_destroy(psa->topicReceivers.map, true, false);
- celixThreadMutex_destroy(&psa->connectedEndpoints.mutex);
- hashMap_destroy(psa->connectedEndpoints.map, true, false);
+ celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
+ hashMap_destroy(psa->discoveredEndpoints.map, true, false);
free(psa->mcIpAddress);
free(psa->ifIpAddress);
@@ -165,14 +248,13 @@ celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderB
return status;
}
-celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, double *outScore) {
+celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *outMatch) {
pubsub_udpmc_admin_t *psa = handle;
LOG_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchEndpoint");
celix_status_t status = CELIX_SUCCESS;
- double score = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_UDPMC_ADMIN_TYPE,
- psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, NULL);
- if (outScore != NULL) {
- *outScore = score;
+ bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_UDPMC_ADMIN_TYPE, NULL);
+ if (outMatch != NULL) {
+ *outMatch = match;
}
return status;
}
@@ -192,11 +274,18 @@ celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scop
celixThreadMutex_lock(&psa->topicSenders.mutex);
pubsub_updmc_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key);
if (sender == NULL) {
- sender = pubsub_udpmcTopicSender_create(psa->ctx, scope, topic, serializerSvcId);
+ sender = pubsub_udpmcTopicSender_create(psa->ctx, scope, topic, serializerSvcId, psa->sendSocket, psa->mcIpAddress);
const char *psaType = pubsub_udpmcTopicSender_psaType(sender);
const char *serType = pubsub_udpmcTopicSender_serializerType(sender);
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, NULL);
+ celix_properties_set(newEndpoint, PUBSUB_UDPMC_SOCKET_ADDRESS_KEY, pubsub_udpmcTopicSender_socketAddress(sender));
+ celix_properties_setLong(newEndpoint, PUBSUB_UDPMC_SOCKET_PORT_KEY, pubsub_udpmcTopicSender_socketPort(sender));
+ //if available also set container name
+ const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
+ if (cn != NULL) {
+ celix_properties_set(newEndpoint, "container_name", cn);
+ }
bool valid = pubsubEndpoint_isValid(newEndpoint, true, true);
if (!valid) {
LOG_ERROR("[PSA UDPMC] Error creating a valid TopicSender. Endpoints are not valid");
@@ -205,7 +294,7 @@ celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scop
free(key);
} else {
hashMap_put(psa->topicSenders.map, key, sender);
- //TODO connect endpoints to sender
+ //TODO connect endpoints to sender, NOTE is this needed for a udpmc topic sender?
}
} else {
free(key);
@@ -235,7 +324,7 @@ celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *s
char *mapKey = hashMapEntry_getKey(entry);
pubsub_updmc_topic_sender_t *sender = hashMap_remove(psa->topicSenders.map, key);
free(mapKey);
- //TODO disconnect endpoints to sender
+ //TODO disconnect endpoints to sender. note is this needed for a udpmc topic sender?
pubsub_udpmcTopicSender_destroy(sender);
} else {
LOG_ERROR("[PSA UDPMC] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic);
@@ -246,30 +335,175 @@ celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *s
return status;
}
-celix_status_t pubsub_udpmcAdmin_setupTopicReciever(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint) {
+celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
pubsub_udpmc_admin_t *psa = handle;
- LOG_INFO("[PSA_UDPMC] pubsub_udpmcAdmin_setupTopicReciever. scope/topic: %s/%s", scope, topic);
+
+ celix_properties_t *newEndpoint = NULL;
+
+ char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ pubsub_updmc_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
+ if (receiver == NULL) {
+ receiver = pubsub_udpmcTopicReceiver_create(psa->ctx, scope, topic, psa->ifIpAddress, serializerSvcId);
+ const char *psaType = pubsub_udpmcTopicReceiver_psaType(receiver);
+ const char *serType = pubsub_udpmcTopicReceiver_serializerType(receiver);
+ newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
+ PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL);
+
+ //if available also set container name
+ const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
+ if (cn != NULL) {
+ celix_properties_set(newEndpoint, "container_name", cn);
+ }
+ bool valid = pubsubEndpoint_isValid(newEndpoint, true, true);
+ if (!valid) {
+ LOG_ERROR("[PSA UDPMC] Error creating a valid TopicReceiver. Endpoints are not valid");
+ celix_properties_destroy(newEndpoint);
+ pubsub_udpmcTopicReceiver_destroy(receiver);
+ free(key);
+ } else {
+ hashMap_put(psa->topicReceivers.map, key, receiver);
+
+ celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
+ pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+ }
+ celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+ }
+ } else {
+ free(key);
+ LOG_ERROR("[PSA_UDPMC] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic);
+ }
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+
+ if (newEndpoint != NULL && outSubscriberEndpoint != NULL) {
+ *outSubscriberEndpoint = newEndpoint;
+ }
+
celix_status_t status = CELIX_SUCCESS;
return status;
}
-celix_status_t pubsub_udpmcAdmin_teardownTopicReciever(void *handle, const char *scope, const char *topic) {
+celix_status_t pubsub_udpmcAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic) {
pubsub_udpmc_admin_t *psa = handle;
- LOG_INFO("[PSA_UDPMC] pubsub_udpmcAdmin_teardownTopicReciever. scope/topic: %s/%s", scope, topic);
+
+ char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ hash_map_entry_t *entry = hashMap_getEntry(psa->topicReceivers.map, key);
+ free(key);
+ if (entry != NULL) {
+ char *receiverKey = hashMapEntry_getKey(entry);
+ pubsub_updmc_topic_receiver_t *receiver = hashMapEntry_getValue(entry);
+ hashMap_remove(psa->topicReceivers.map, receiverKey);
+
+ free(receiverKey);
+ pubsub_udpmcTopicReceiver_destroy(receiver);
+ }
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+
celix_status_t status = CELIX_SUCCESS;
return status;
}
+static celix_status_t pubsub_udpmcAdmin_connectEndpointToReceiver(pubsub_udpmc_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
+ //note can be called with discoveredEndpoint.mutex lock
+ celix_status_t status = CELIX_SUCCESS;
+
+ const char *scope = pubsub_udpmcTopicReceiver_scope(receiver);
+ const char *topic = pubsub_udpmcTopicReceiver_topic(receiver);
+
+ const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+ const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+ const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+ const char *sockAdress = celix_properties_get(endpoint, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY, NULL);
+ long sockPort = celix_properties_getAsLong(endpoint, PUBSUB_PSA_UDPMC_SOCKET_PORT_KEY, -1L);
+
+ if (type == NULL || sockAdress == NULL || sockPort < 0) {
+ fprintf(stderr, "[PSA UPDMC] Error got endpoint without udpmc socket address/port or endpoint type");
+ status = CELIX_BUNDLE_EXCEPTION;
+ } else {
+ if (eScope != NULL && eTopic != NULL && type != NULL &&
+ strncmp(eScope, scope, 1024 * 1024) == 0 &&
+ strncmp(eTopic, topic, 1024 * 1024) == 0 &&
+ strncmp(type, PUBSUB_PUBLISHER_ENDPOINT_TYPE, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+ pubsub_udpmcTopicReceiver_connectTo(receiver, sockAdress, sockPort);
+ }
+ }
+
+ return status;
+}
+
celix_status_t pubsub_udpmcAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint) {
pubsub_udpmc_admin_t *psa = handle;
- LOG_INFO("[PSA_UDPMC] pubsub_udpmcAdmin_addEndpoint");
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_updmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+ pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+ }
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+ celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+ celix_properties_t *cpy = celix_properties_copy(endpoint);
+ const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, NULL);
+ hashMap_put(psa->discoveredEndpoints.map, (void*)uuid, cpy);
+ celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+
celix_status_t status = CELIX_SUCCESS;
return status;
}
+
+static celix_status_t pubsub_udpmcAdmin_disconnectEndpointFromReceiver(pubsub_udpmc_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint) {
+ //note can be called with discoveredEndpoint.mutex lock
+ celix_status_t status = CELIX_SUCCESS;
+
+ const char *scope = pubsub_udpmcTopicReceiver_scope(receiver);
+ const char *topic = pubsub_udpmcTopicReceiver_topic(receiver);
+
+ const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+ const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+ const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+ const char *sockAdress = celix_properties_get(endpoint, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY, NULL);
+ long sockPort = celix_properties_getAsLong(endpoint, PUBSUB_PSA_UDPMC_SOCKET_PORT_KEY, -1L);
+
+ if (type == NULL || sockAdress == NULL || sockPort < 0) {
+ fprintf(stderr, "[PSA UPDMC] Error got endpoint without udpmc socket address/port or endpoint type");
+ status = CELIX_BUNDLE_EXCEPTION;
+ } else {
+ if (eScope != NULL && eTopic != NULL && type != NULL &&
+ strncmp(eScope, scope, 1024 * 1024) == 0 &&
+ strncmp(eTopic, topic, 1024 * 1024) == 0 &&
+ strncmp(type, PUBSUB_PUBLISHER_ENDPOINT_TYPE, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+ pubsub_udpmcTopicReceiver_disconnectFrom(receiver, sockAdress, sockPort);
+ }
+ }
+
+ return status;
+}
+
celix_status_t pubsub_udpmcAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint) {
pubsub_udpmc_admin_t *psa = handle;
- LOG_INFO("[PSA_UDPMC] pubsub_udpmcAdmin_removeEndpoint");
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_updmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+ pubsub_udpmcAdmin_disconnectEndpointFromReceiver(psa, receiver, endpoint);
+ }
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+ celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+ const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
+ celix_properties_t *found = hashMap_remove(psa->discoveredEndpoints.map, (void*)uuid);
+ celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+
+ if (found != NULL) {
+ celix_properties_destroy(found);
+ }
+
celix_status_t status = CELIX_SUCCESS;
return status;
}
@@ -278,7 +512,8 @@ celix_status_t pubsub_udpmcAdmin_executeCommand(void *handle, char *commandLine
pubsub_udpmc_admin_t *psa = handle;
celix_status_t status = CELIX_SUCCESS;
- fprintf(out, "\nTopic Senders:\n");
+ fprintf(out, "\n");
+ fprintf(out, "Topic Senders:\n");
celixThreadMutex_lock(&psa->topicSenders.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
while (hashMapIterator_hasNext(&iter)) {
@@ -287,16 +522,32 @@ celix_status_t pubsub_udpmcAdmin_executeCommand(void *handle, char *commandLine
const char *serType = pubsub_udpmcTopicSender_serializerType(sender);
const char *scope = pubsub_udpmcTopicSender_scope(sender);
const char *topic = pubsub_udpmcTopicSender_topic(sender);
- const char *url = pubsub_udpmcTopicSender_socketAddress(sender);
+ const char *sockAddr = pubsub_udpmcTopicSender_socketAddress(sender);
+ fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
+ fprintf(out, " |- psa type = %s\n", psaType);
+ fprintf(out, " |- serializer type = %s\n", serType);
+ fprintf(out, " |- socket address = %s\n", sockAddr);
+ }
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+ fprintf(out, "\n");
+ fprintf(out, "\nTopic Receivers:\n");
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ iter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_updmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+ const char *psaType = pubsub_udpmcTopicReceiver_psaType(receiver);
+ const char *serType = pubsub_udpmcTopicReceiver_serializerType(receiver);
+ const char *scope = pubsub_udpmcTopicReceiver_scope(receiver);
+ const char *topic = pubsub_udpmcTopicReceiver_topic(receiver);
fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
fprintf(out, " |- psa type = %s\n", psaType);
fprintf(out, " |- serializer type = %s\n", serType);
- fprintf(out, " |- url = %s\n", url);
}
celixThreadMutex_unlock(&psa->topicSenders.mutex);
fprintf(out, "\n");
- //TODO topic receivers
+ //TODO topic receivers/senders connection count
return status;
}
http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
index 011d272..11c7e13 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
@@ -23,8 +23,9 @@
#include "celix_api.h"
#include "log_helper.h"
-#define PUBSUB_UDPMC_ADMIN_TYPE "udpmc"
-#define PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY "pubsub.udpmc.socket_address"
+#define PUBSUB_UDPMC_ADMIN_TYPE "udp_mc"
+#define PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY "udpmc.socket_address"
+#define PUBSUB_PSA_UDPMC_SOCKET_PORT_KEY "udpmc.socker_port"
#define PUBSUB_UDPMC_IP_KEY "PSA_IP"
#define PUBSUB_UDPMC_ITF_KEY "PSA_INTERFACE"
@@ -34,54 +35,20 @@
#define PUBSUB_UDPMC_MULTICAST_IP_PREFIX_DEFAULT "224.100"
#define PUBSUB_UDPMC_VERBOSE_DEFAULT "true"
-
-typedef struct pubsub_udpmc_admin {
- celix_bundle_context_t *ctx;
- log_helper_t *log;
- char* ifIpAddress; // The local interface which is used for multicast communication
- char* mcIpAddress; // The multicast IP address
- int sendSocket;
- double qosSampleScore;
- double qosControlScore;
- double defaultScore;
- bool verbose;
- const char *fwUUID;
-
- struct {
- celix_thread_mutex_t mutex;
- hash_map_t *map; //key = scope:topic key, value = pubsub_udpmc_topic_sender_t
- } topicSenders;
-
- struct {
- celix_thread_mutex_t mutex;
- hash_map_t *map; //key = scope:topic key, value = pubsub_udpmc_topic_sender_t
- } topicReceivers;
-
- struct {
- celix_thread_mutex_t mutex;
- hash_map_t *map; //key = endpoint uuid, value = psa_udpmc_connected_endpoint_entry_t
- } connectedEndpoints;
-
-} pubsub_udpmc_admin_t;
-
-typedef struct psa_udpmc_connected_endpoint {
- void *sender; //if connected endpoint is subscriber. todo type
- void *receiver; //if connected endpoint is publisher. TODO type
- char *endpointUUID;
-} psa_udpmc_connected_endpoint_t;
+typedef struct pubsub_udpmc_admin pubsub_udpmc_admin_t;
pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa);
celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId);
celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId);
-celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, double *score);
+celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint);
celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
-celix_status_t pubsub_udpmcAdmin_setupTopicReciever(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint);
-celix_status_t pubsub_udpmcAdmin_teardownTopicReciever(void *handle, const char *scope, const char *topic);
+celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_udpmcAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
celix_status_t pubsub_udpmcAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint);
celix_status_t pubsub_udpmcAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint);
http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.c
new file mode 100644
index 0000000..a039e6c
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.c
@@ -0,0 +1,40 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include "pubsub_udpmc_common.h"
+
+int psa_udpmc_localMsgTypeIdForMsgType(void* handle __attribute__((unused)), const char* msgType, unsigned int* msgTypeId) {
+ *msgTypeId = utils_stringHash(msgType);
+ return 0;
+}
+
+bool psa_udpmc_checkVersion(version_pt msgVersion, pubsub_msg_header_t *hdr) {
+ bool check=false;
+ int major=0,minor=0;
+
+ if(msgVersion!=NULL){
+ version_getMajor(msgVersion,&major);
+ version_getMinor(msgVersion,&minor);
+ if(hdr->major==((unsigned char)major)){ /* Different major means incompatible */
+ check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
+ }
+ }
+
+ return check;
+}
http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.h
new file mode 100644
index 0000000..e49aaa7
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.h
@@ -0,0 +1,39 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef CELIX_PUBSUB_UDPMC_COMMON_H
+#define CELIX_PUBSUB_UDPMC_COMMON_H
+
+#include <utils.h>
+
+#include "version.h"
+#include "pubsub_common.h"
+
+typedef struct pubsub_udp_msg {
+ struct pubsub_msg_header header;
+ unsigned int payloadSize;
+ char payload[];
+} pubsub_udp_msg_t;
+
+int psa_udpmc_localMsgTypeIdForMsgType(void* handle __attribute__((unused)), const char* msgType, unsigned int* msgTypeId);
+
+bool psa_udpmc_checkVersion(version_pt msgVersion, pubsub_msg_header_t *hdr);
+
+
+#endif //CELIX_PUBSUB_UDPMC_COMMON_H
http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
new file mode 100644
index 0000000..f3e320a
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
@@ -0,0 +1,450 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <pubsub_serializer.h>
+#include <stdlib.h>
+#include <pubsub/subscriber.h>
+#include <memory.h>
+#include <pubsub_constants.h>
+#include <sys/epoll.h>
+#include <assert.h>
+#include <pubsub_endpoint.h>
+#include <arpa/inet.h>
+#include "pubsub_udpmc_topic_receiver.h"
+#include "pubsub_psa_udpmc_constants.h"
+#include "large_udp.h"
+#include "pubsub_udpmc_common.h"
+
+#define MAX_EPOLL_EVENTS 10
+#define RECV_THREAD_TIMEOUT 5
+#define UDP_BUFFER_SIZE 65535
+#define MAX_UDP_SESSIONS 16
+
+struct pubsub_updmc_topic_receiver {
+ celix_bundle_context_t *ctx;
+ char *scope;
+ char *topic;
+ char* ifIpAddress;
+ largeUdp_pt largeUdpHandle;
+ int topicEpollFd; // EPOLL filedescriptor where the sockets are registered.
+
+ //serialiser svc
+ long serializerTrackerId;
+ struct {
+
+ celix_thread_mutex_t mutex; //protect svc
+ pubsub_serializer_service_t *svc;
+ const celix_properties_t *props;
+ } serializer;
+
+ struct {
+ celix_thread_t thread;
+ celix_thread_mutex_t mutex;
+ bool running;
+ } recvThread;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = socketAddress, value = psa_udpmc_requested_connection_entry_t*
+ } requestedConnections;
+
+ long subscriberTrackerId;
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = bnd id, value = psa_udpmc_subscriber_entry_t
+ } subscribers;
+};
+
+typedef struct psa_udpmc_requested_connection_entry {
+ char *key;
+ char *socketAddress;
+ long socketPort;
+ bool connected;
+ int recvSocket;
+} psa_udpmc_requested_connection_entry_t;
+
+typedef struct psa_udpmc_subscriber_entry {
+ int usageCount;
+ hash_map_t *msgTypes; //map from serializer svc
+ pubsub_subscriber_t *svc;
+} psa_udpmc_subscriber_entry_t;
+
+
+typedef struct pubsub_msg{
+ pubsub_msg_header_t *header;
+ char* payload;
+ unsigned int payloadSize;
+} pubsub_msg_t;
+
+static void pubsub_udpmcTopicReceiver_setSerializer(void *handle, void *svc, const celix_properties_t *props);
+static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
+static void pubsub_udpmcTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
+static void psa_udpmc_processMsg(pubsub_updmc_topic_receiver_t *receiver, pubsub_udp_msg_t *msg);
+static void* psa_udpmc_recvThread(void * data);
+
+
+pubsub_updmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_context_t *ctx,
+ const char *scope,
+ const char *topic,
+ const char *ifIP,
+ long serializerSvcId) {
+ pubsub_updmc_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
+ receiver->ctx = ctx;
+ receiver->scope = strndup(scope, 1024 * 1024);
+ receiver->topic = strndup(topic, 1024 * 1024);
+ receiver->ifIpAddress = strndup(ifIP, 1024 * 1024);
+ receiver->recvThread.running = true;
+ receiver->largeUdpHandle = largeUdp_create(MAX_UDP_SESSIONS);
+ receiver->topicEpollFd = epoll_create1(0);
+
+
+ celixThreadMutex_create(&receiver->serializer.mutex, NULL);
+ celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
+ celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
+ celixThreadMutex_create(&receiver->recvThread.mutex, NULL);
+
+ receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+ receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+ //track serializer svc based on the provided serializerSvcId
+ {
+ char filter[64];
+ snprintf(filter, 64, "(service.id=%li)", serializerSvcId);
+
+ celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+ opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
+ opts.filter.filter = filter;
+ opts.filter.ignoreServiceLanguage = true;
+ opts.callbackHandle = receiver;
+ opts.setWithProperties = pubsub_udpmcTopicReceiver_setSerializer;
+ receiver->serializerTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+ }
+
+ //track subscribers
+ {
+ int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
+ char buf[size+1];
+ snprintf(buf, (size_t)size+1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
+ celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+ opts.filter.ignoreServiceLanguage = true;
+ opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
+ opts.filter.filter = buf;
+ opts.callbackHandle = receiver;
+ opts.addWithOwner = pubsub_udpmcTopicReceiver_addSubscriber;
+ opts.removeWithOwner = pubsub_udpmcTopicReceiver_removeSubscriber;
+
+ receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+ }
+
+ celixThread_create(&receiver->recvThread.thread, NULL, psa_udpmc_recvThread, receiver);
+
+ return receiver;
+}
+
+void pubsub_udpmcTopicReceiver_destroy(pubsub_updmc_topic_receiver_t *receiver) {
+ if (receiver != NULL) {
+ celix_bundleContext_stopTracker(receiver->ctx, receiver->serializerTrackerId);
+ celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId);
+
+ celixThreadMutex_lock(&receiver->recvThread.mutex);
+ receiver->recvThread.running = false;
+ celixThreadMutex_unlock(&receiver->recvThread.mutex);
+ celixThread_join(receiver->recvThread.thread, NULL);
+
+ celixThreadMutex_destroy(&receiver->serializer.mutex);
+ celixThreadMutex_destroy(&receiver->subscribers.mutex);
+ celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
+ celixThreadMutex_destroy(&receiver->recvThread.mutex);
+
+ largeUdp_destroy(receiver->largeUdpHandle);
+ //TODO cleanup entries, free map
+
+ //TODO clean up requested connections map
+ }
+ free(receiver);
+}
+
+const char* pubsub_udpmcTopicReceiver_psaType(pubsub_updmc_topic_receiver_t *receiver) {
+ return PSA_UDPMC_PUBSUB_ADMIN_TYPE;
+}
+
+const char* pubsub_udpmcTopicReceiver_serializerType(pubsub_updmc_topic_receiver_t *receiver) {
+ const char *result = NULL;
+ celixThreadMutex_lock(&receiver->serializer.mutex);
+ if (receiver->serializer.props != NULL) {
+ result = celix_properties_get(receiver->serializer.props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
+ }
+ celixThreadMutex_unlock(&receiver->serializer.mutex);
+ return result;
+}
+
+const char* pubsub_udpmcTopicReceiver_scope(pubsub_updmc_topic_receiver_t *receiver) {
+ return receiver->scope;
+}
+const char* pubsub_udpmcTopicReceiver_topic(pubsub_updmc_topic_receiver_t *receiver) {
+ return receiver->topic;
+}
+
+void pubsub_udpmcTopicReceiver_connectTo(
+ pubsub_updmc_topic_receiver_t *receiver,
+ const char *socketAddress,
+ long socketPort) {
+ printf("[PSA UDPMC] TopicReceiver %s/%s connect to socket address = %s:%li\n", receiver->scope, receiver->topic, socketAddress, socketPort);
+
+ int len = snprintf(NULL, 0, "%s:%li", socketAddress, socketPort);
+ char *key = calloc((size_t)len+1, sizeof(char));
+ snprintf(key, (size_t)len+1, "%s:%li", socketAddress, socketPort);
+
+ celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+ psa_udpmc_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, key);
+ if (entry == NULL) {
+ entry = calloc(1, sizeof(*entry));
+ entry->key = key;
+ entry->socketAddress = strndup(socketAddress, 1024 * 1024);
+ entry->socketPort = socketPort;
+ entry->connected = false;
+ hashMap_put(receiver->requestedConnections.map, (void*)entry->key, entry);
+ } else {
+ free(key);
+ }
+ if (!entry->connected) {
+ int rc = 0;
+ entry->recvSocket = socket(AF_INET, SOCK_DGRAM, 0);
+ if (entry->recvSocket >= 0) {
+ int reuse = 1;
+ rc = setsockopt(entry->recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse));
+ }
+ if (entry->recvSocket >= 0 && rc >= 0) {
+ struct ip_mreq mc_addr;
+ mc_addr.imr_multiaddr.s_addr = inet_addr(entry->socketAddress);
+ mc_addr.imr_interface.s_addr = inet_addr(receiver->ifIpAddress);
+ printf("Adding MC %s at interface %s\n", entry->socketAddress, receiver->ifIpAddress);
+ rc = setsockopt(entry->recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *) &mc_addr, sizeof(mc_addr));
+ }
+ if (entry->recvSocket >= 0 && rc >= 0) {
+ struct sockaddr_in mcListenAddr;
+ mcListenAddr.sin_family = AF_INET;
+ mcListenAddr.sin_addr.s_addr = INADDR_ANY;
+ mcListenAddr.sin_port = htons((uint16_t )entry->socketPort);
+ rc = bind(entry->recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr));
+ }
+ if (entry->recvSocket >= 0 && rc >= 0) {
+ struct epoll_event ev;
+ memset(&ev, 0, sizeof(ev));
+ ev.events = EPOLLIN;
+ ev.data.fd = entry->recvSocket;
+ rc = epoll_ctl(receiver->topicEpollFd, EPOLL_CTL_ADD, entry->recvSocket, &ev);
+ }
+
+ if (entry->recvSocket < 0 || rc < 0) {
+ fprintf(stderr, "[PSA UDPMC] Error connecting TopicReceiver %s/%s to %s:%li. (%s)\n", receiver->scope, receiver->topic, socketAddress, socketPort, strerror(errno));
+ } else {
+ entry->connected = true;
+ }
+ }
+ celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+void pubsub_udpmcTopicReceiver_disconnectFrom(pubsub_updmc_topic_receiver_t *receiver, const char *socketAddress, long socketPort) {
+ printf("[PSA UDPMC] TopicReceiver %s/%s disconnect from socket address = %s:%li\n", receiver->scope, receiver->topic, socketAddress, socketPort);
+
+ int len = snprintf(NULL, 0, "%s:%li", socketAddress, socketPort);
+ char *key = calloc((size_t)len+1, sizeof(char));
+ snprintf(key, (size_t)len+1, "%s:%li", socketAddress, socketPort);
+
+ celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+ psa_udpmc_requested_connection_entry_t *entry = hashMap_remove(receiver->requestedConnections.map, key);
+ free(key);
+ if (entry != NULL && entry->connected) {
+ struct epoll_event ev;
+ memset(&ev, 0, sizeof(ev));
+ int rc = epoll_ctl(receiver->topicEpollFd, EPOLL_CTL_DEL, entry->recvSocket, &ev);
+ if (rc < 0) {
+ fprintf(stderr, "[PSA UDPMC] Error disconnecting TopicReceiver %s/%s to %s:%li.\n%s", receiver->scope, receiver->topic, socketAddress, socketPort, strerror(errno));
+ }
+ }
+ if (entry != NULL) {
+ free(entry->key);
+ free(entry->socketAddress);
+ free(entry);
+ }
+ celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+
+static void pubsub_udpmcTopicReceiver_setSerializer(void *handle, void *svc, const celix_properties_t *props) {
+ pubsub_updmc_topic_receiver_t *receiver = handle;
+ pubsub_serializer_service_t *ser = svc;
+
+ if (ser == NULL) {
+ //TODO -> no serializer -> remove all publishers
+ }
+
+ celixThreadMutex_lock(&receiver->serializer.mutex);
+ receiver->serializer.svc = ser;
+ receiver->serializer.props = props;
+ celixThreadMutex_unlock(&receiver->serializer.mutex);
+}
+
+static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
+ pubsub_updmc_topic_receiver_t *receiver = handle;
+
+ long bndId = celix_bundle_getId(bnd);
+ const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default");
+ if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) {
+ //not the same scope. ignore
+ return;
+ }
+
+ celixThreadMutex_lock(&receiver->subscribers.mutex);
+ psa_udpmc_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
+ if (entry != NULL) {
+ entry->usageCount += 1;
+ } else {
+ //new create entry
+ entry = calloc(1, sizeof(*entry));
+ entry->usageCount = 1;
+ entry->svc = svc;
+
+ celixThreadMutex_lock(&receiver->serializer.mutex);
+ if (receiver->serializer.svc != NULL) {
+ receiver->serializer.svc->createSerializerMap(receiver->serializer.svc->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
+ } else {
+ fprintf(stderr, "Cannot find serializer for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+ }
+ celixThreadMutex_unlock(&receiver->serializer.mutex);
+
+ hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
+
+ }
+ celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
+
+static void pubsub_udpmcTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
+ pubsub_updmc_topic_receiver_t *receiver = handle;
+
+ long bndId = celix_bundle_getId(bnd);
+
+ celixThreadMutex_lock(&receiver->subscribers.mutex);
+ psa_udpmc_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
+ if (entry != NULL) {
+ entry->usageCount -= 1;
+ }
+ if (entry != NULL && entry->usageCount <= 0) {
+ //remove entry
+ hashMap_remove(receiver->subscribers.map, (void*)bndId);
+ celixThreadMutex_lock(&receiver->serializer.mutex);
+ if (receiver->serializer.svc != NULL) {
+ receiver->serializer.svc->destroySerializerMap(receiver->serializer.svc->handle, entry->msgTypes);
+ } else {
+ fprintf(stderr, "Cannot find serializer for TopicReceiver %s/%s", receiver->scope, receiver->topic);
+ }
+ celixThreadMutex_unlock(&receiver->serializer.mutex);
+ free(entry);
+ }
+ celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
+
+static void* psa_udpmc_recvThread(void * data) {
+ pubsub_updmc_topic_receiver_t *receiver = data;
+
+ struct epoll_event events[MAX_EPOLL_EVENTS];
+
+ celixThreadMutex_lock(&receiver->recvThread.mutex);
+ bool running = receiver->recvThread.running;
+ celixThreadMutex_unlock(&receiver->recvThread.mutex);
+
+ while (running) {
+ int nfds = epoll_wait(receiver->topicEpollFd, events, MAX_EPOLL_EVENTS, RECV_THREAD_TIMEOUT * 1000);
+ int i;
+ for(i = 0; i < nfds; i++ ) {
+ unsigned int index;
+ unsigned int size;
+ if(largeUdp_dataAvailable(receiver->largeUdpHandle, events[i].data.fd, &index, &size) == true) {
+ // Handle data
+ pubsub_udp_msg_t *udpMsg = NULL;
+ if(largeUdp_read(receiver->largeUdpHandle, index, (void**)&udpMsg, size) != 0) {
+ printf("[PSA_UDPMC]: ERROR largeUdp_read with index %d\n", index);
+ continue;
+ }
+
+ psa_udpmc_processMsg(receiver, udpMsg);
+
+ free(udpMsg);
+ }
+ }
+ celixThreadMutex_lock(&receiver->recvThread.mutex);
+ running = receiver->recvThread.running;
+ celixThreadMutex_unlock(&receiver->recvThread.mutex);
+ }
+
+ return NULL;
+}
+
+static void psa_udpmc_processMsg(pubsub_updmc_topic_receiver_t *receiver, pubsub_udp_msg_t *msg) {
+ celixThreadMutex_lock(&receiver->subscribers.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_udpmc_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+
+ pubsub_msg_serializer_t *msgSer = NULL;
+ if (entry->msgTypes != NULL) {
+ msgSer = hashMap_get(entry->msgTypes, (void *) (uintptr_t) msg->header.type);
+ }
+ if (msgSer == NULL) {
+ printf("[PSA_UDPMC] Serializer not available for message %d.\n",msg->header.type);
+ } else{
+ void *msgInst = NULL;
+ bool validVersion = psa_udpmc_checkVersion(msgSer->msgVersion, &msg->header);
+
+ if(validVersion){
+
+ celix_status_t status = msgSer->deserialize(msgSer, (const void *) msg->payload, 0, &msgInst);
+
+ if (status == CELIX_SUCCESS) {
+ bool release = true;
+ pubsub_multipart_callbacks_t mp_callbacks;
+ mp_callbacks.handle = receiver;
+ mp_callbacks.localMsgTypeIdForMsgType = psa_udpmc_localMsgTypeIdForMsgType;
+ mp_callbacks.getMultipart = NULL;
+
+ pubsub_subscriber_t *svc = entry->svc;
+ svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, &mp_callbacks, &release);
+
+ if(release){
+ msgSer->freeMsg(msgSer,msgInst);
+ }
+ }
+ else{
+ printf("[PSA_UDPMC] Cannot deserialize msgType %s.\n",msgSer->msgName);
+ }
+
+ }
+ else{
+ int major=0,minor=0;
+ version_getMajor(msgSer->msgVersion,&major);
+ version_getMinor(msgSer->msgVersion,&minor);
+ printf("[PSA_UDPMC] Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",
+ msgSer->msgName,major,minor,msg->header.major,msg->header.minor);
+ }
+
+ }
+ }
+ celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h
new file mode 100644
index 0000000..ee2c113
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h
@@ -0,0 +1,45 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+#ifndef CELIX_PUBSUB_UDPMC_TOPIC_RECEIVER_H
+#define CELIX_PUBSUB_UDPMC_TOPIC_RECEIVER_H
+
+#include "celix_bundle_context.h"
+
+typedef struct pubsub_updmc_topic_receiver pubsub_updmc_topic_receiver_t;
+
+pubsub_updmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_context_t *ctx,
+ const char *scope,
+ const char *topic,
+ const char *ifIP,
+ long serializerSvcId);
+void pubsub_udpmcTopicReceiver_destroy(pubsub_updmc_topic_receiver_t *receiver);
+
+const char* pubsub_udpmcTopicReceiver_psaType(pubsub_updmc_topic_receiver_t *receiver);
+const char* pubsub_udpmcTopicReceiver_serializerType(pubsub_updmc_topic_receiver_t *receiver);
+const char* pubsub_udpmcTopicReceiver_scope(pubsub_updmc_topic_receiver_t *receiver);
+const char* pubsub_udpmcTopicReceiver_topic(pubsub_updmc_topic_receiver_t *receiver);
+const char* pubsub_udpmcTopicReceiver_socketAddress(pubsub_updmc_topic_receiver_t *receiver);
+
+void pubsub_udpmcTopicReceiver_connectTo(
+ pubsub_updmc_topic_receiver_t *receiver,
+ const char *socketAddress,
+ long socketPort);
+void pubsub_udpmcTopicReceiver_disconnectFrom(pubsub_updmc_topic_receiver_t *receiver, const char *socketAddress, long socketPort);
+
+#endif //CELIX_PUBSUB_UDPMC_TOPIC_RECEIVER_H
http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
index 5553d3b..4a6d027 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
@@ -1,17 +1,52 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
#include <pubsub_serializer.h>
#include <stdlib.h>
#include <memory.h>
#include <pubsub_constants.h>
+#include <pubsub/publisher.h>
+#include <utils.h>
+#include <pubsub_common.h>
+#include <zconf.h>
+#include <arpa/inet.h>
#include "pubsub_udpmc_topic_sender.h"
#include "pubsub_psa_udpmc_constants.h"
+#include "large_udp.h"
+#include "pubsub_udpmc_common.h"
+
+#define FIRST_SEND_DELAY_IN_SECONDS 2
+
+//TODO make configurable
+#define UDP_BASE_PORT 49152
+#define UDP_MAX_PORT 65000
-static void pubsub_udpmcTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props);
struct pubsub_updmc_topic_sender {
celix_bundle_context_t *ctx;
char *scope;
char *topic;
char *socketAddress;
+ long socketPort;
+
+ int sendSocket;
+ struct sockaddr_in destAddr;
long serTrackerId;
struct {
@@ -19,26 +54,95 @@ struct pubsub_updmc_topic_sender {
pubsub_serializer_service_t *svc;
const celix_properties_t *props;
} serializer;
+
+ struct {
+ long svcId;
+ celix_service_factory_t factory;
+ } publisher;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = bndId, value = psa_udpmc_bounded_service_entry_t
+ } boundedServices;
};
-pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, long serializerSvcId) {
+typedef struct psa_udpmc_bounded_service_entry {
+ pubsub_updmc_topic_sender_t *parent;
+ pubsub_publisher_t service;
+ long bndId;
+ hash_map_t *msgTypes;
+ int getCount;
+ largeUdp_pt largeUdpHandle;
+} psa_udpmc_bounded_service_entry_t;
+
+typedef struct pubsub_msg{
+ pubsub_msg_header_t *header;
+ char* payload;
+ unsigned int payloadSize;
+} pubsub_msg_t;
+
+static void pubsub_udpmcTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props);
+static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
+static void psa_udpmc_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
+static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg);
+static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback);
+static unsigned int rand_range(unsigned int min, unsigned int max);
+
+pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create(
+ celix_bundle_context_t *ctx,
+ const char *scope,
+ const char *topic,
+ long serializerSvcId,
+ int sendSocket,
+ const char *bindIP) {
pubsub_updmc_topic_sender_t *sender = calloc(1, sizeof(*sender));
sender->ctx = ctx;
sender->scope = strndup(scope, 1024 * 1024);
sender->topic = strndup(topic, 1024 * 1024);
celixThreadMutex_create(&sender->serializer.mutex, NULL);
+ celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
+ sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
+
+ //setting up socket for UDPMC TopicSender
+ {
+ unsigned int port = rand_range(UDP_BASE_PORT, UDP_MAX_PORT);
+ sender->sendSocket = sendSocket;
+ sender->destAddr.sin_family = AF_INET;
+ sender->destAddr.sin_addr.s_addr = inet_addr(bindIP);
+ sender->destAddr.sin_port = htons((uint16_t)port);
+
+ sender->socketAddress = strndup(bindIP, 1024);
+ sender->socketPort = port;
+ }
+
+ //track serializer svc based on the provided serializerSvcId
+ {
+ char filter[64];
+ snprintf(filter, 64, "(service.id=%li)", serializerSvcId);
+
+ celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+ opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
+ opts.filter.filter = filter;
+ opts.filter.ignoreServiceLanguage = true;
+ opts.callbackHandle = sender;
+ opts.setWithProperties = pubsub_udpmcTopicSender_setSerializer;
+ sender->serTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+ }
+
+ //register publisher services using a service factory
+ {
+ sender->publisher.factory.handle = sender;
+ sender->publisher.factory.getService = psa_udpmc_getPublisherService;
+ sender->publisher.factory.ungetService = psa_udpmc_ungetPublisherService;
- char filter[64];
- snprintf(filter, 64, "(service.id=%li)", serializerSvcId);
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic);
+ celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope);
- celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
- opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
- opts.filter.filter = filter;
- opts.filter.ignoreServiceLanguage = true;
- opts.callbackHandle = sender;
- opts.setWithProperties = pubsub_udpmcTopicSender_setSerializer;
- sender->serTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+
+ sender->publisher.svcId = celix_bundleContext_registerServiceFactory(ctx, &sender->publisher.factory, PUBSUB_PUBLISHER_SERVICE_NAME, props);
+ }
return sender;
}
@@ -46,11 +150,17 @@ pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create(celix_bundle_context
void pubsub_udpmcTopicSender_destroy(pubsub_updmc_topic_sender_t *sender) {
if (sender != NULL) {
celix_bundleContext_stopTracker(sender->ctx, sender->serTrackerId);
+ celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId);
celixThreadMutex_destroy(&sender->serializer.mutex);
+ celixThreadMutex_destroy(&sender->boundedServices.mutex);
+
+ //TODO loop and cleanup?
+ hashMap_destroy(sender->boundedServices.map, false, true);
free(sender->scope);
free(sender->topic);
+ free(sender->socketAddress);
free(sender);
}
}
@@ -81,8 +191,12 @@ const char* pubsub_udpmcTopicSender_socketAddress(pubsub_updmc_topic_sender_t *s
return sender->socketAddress;
}
+long pubsub_udpmcTopicSender_socketPort(pubsub_updmc_topic_sender_t *sender) {
+ return sender->socketPort;
+}
+
void pubsub_udpmcTopicSender_connectTo(pubsub_updmc_topic_sender_t *sender, const celix_properties_t *endpoint) {
- //TODO
+ //TODO subscriber count -> topic info
}
void pubsub_udpmcTopicSender_disconnectFrom(pubsub_updmc_topic_sender_t *sender, const celix_properties_t *endpoint) {
@@ -92,8 +206,173 @@ void pubsub_udpmcTopicSender_disconnectFrom(pubsub_updmc_topic_sender_t *sender,
static void pubsub_udpmcTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props) {
pubsub_updmc_topic_sender_t *sender = handle;
pubsub_serializer_service_t *ser = svc;
+
+ if (ser == NULL) {
+ //TODO -> no serializer -> remove all publishers
+ }
+
celixThreadMutex_lock(&sender->serializer.mutex);
sender->serializer.svc = ser;
sender->serializer.props = props;
celixThreadMutex_unlock(&sender->serializer.mutex);
-}
\ No newline at end of file
+}
+
+static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) {
+ pubsub_updmc_topic_sender_t *sender = handle;
+ long bndId = celix_bundle_getId(requestingBundle);
+
+ celixThreadMutex_lock(&sender->boundedServices.mutex);
+ psa_udpmc_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void*)bndId);
+ if (entry != NULL) {
+ entry->getCount += 1;
+ } else {
+ entry = calloc(1, sizeof(*entry));
+ entry->getCount = 1;
+ entry->parent = sender;
+ entry->bndId = bndId;
+ entry->largeUdpHandle = largeUdp_create(1);
+
+ celixThreadMutex_lock(&sender->serializer.mutex);
+ celix_status_t rc = CELIX_SUCCESS;
+ if (sender->serializer.svc != NULL) {
+ rc = sender->serializer.svc->createSerializerMap(sender->serializer.svc->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
+ }
+ if (sender->serializer.svc == NULL || rc != CELIX_SUCCESS) {
+ //TODO destroy and return NULL?
+ fprintf(stderr, "Error creating publisher service, serializer not available / cannot get msg serializer map\n");
+ }
+ celixThreadMutex_unlock(&sender->serializer.mutex);
+
+
+ entry->service.handle = entry;
+ entry->service.localMsgTypeIdForMsgType = psa_udpmc_localMsgTypeIdForMsgType;
+ entry->service.send = psa_udpmc_topicPublicationSend;
+ entry->service.sendMultipart = NULL; //note multipart not supported by MCUDP
+
+ hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
+ }
+ celixThreadMutex_unlock(&sender->boundedServices.mutex);
+
+ return &entry->service;
+}
+
+static void psa_udpmc_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) {
+ pubsub_updmc_topic_sender_t *sender = handle;
+ long bndId = celix_bundle_getId(requestingBundle);
+
+ celixThreadMutex_lock(&sender->boundedServices.mutex);
+ psa_udpmc_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void*)bndId);
+ if (entry != NULL) {
+ entry->getCount -= 1;
+ }
+ if (entry != NULL && entry->getCount == 0) {
+ //free entry
+ hashMap_remove(sender->boundedServices.map, (void*)bndId);
+
+
+ celixThreadMutex_lock(&sender->serializer.mutex);
+ celix_status_t rc = CELIX_SUCCESS;
+ if (sender->serializer.svc != NULL) {
+ rc = sender->serializer.svc->destroySerializerMap(sender->serializer.svc->handle, entry->msgTypes);
+ }
+ if (sender->serializer.svc == NULL || rc != CELIX_SUCCESS) {
+ fprintf(stderr, "Error destroying publisher service, serializer not available / cannot get msg serializer map\n");
+ }
+ celixThreadMutex_unlock(&sender->serializer.mutex);
+
+
+ largeUdp_destroy(entry->largeUdpHandle);
+ free(entry);
+ }
+ celixThreadMutex_unlock(&sender->boundedServices.mutex);
+}
+
+static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg) {
+ psa_udpmc_bounded_service_entry_t *entry = handle;
+ int status = 0;
+
+ pubsub_msg_serializer_t* msgSer = NULL;
+ if (entry->msgTypes != NULL) {
+ msgSer = hashMap_get(entry->msgTypes, (void*)(intptr_t)(msgTypeId));
+ }
+
+ if (msgSer != NULL) {
+ int major=0, minor=0;
+
+ pubsub_msg_header_t *msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
+ strncpy(msg_hdr->topic,entry->parent->topic,MAX_TOPIC_LEN-1);
+ msg_hdr->type = msgTypeId;
+
+
+ if (msgSer->msgVersion != NULL){
+ version_getMajor(msgSer->msgVersion, &major);
+ version_getMinor(msgSer->msgVersion, &minor);
+ msg_hdr->major = major;
+ msg_hdr->minor = minor;
+ }
+
+ void* serializedOutput = NULL;
+ size_t serializedOutputLen = 0;
+ msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen);
+
+ pubsub_msg_t *msg = calloc(1,sizeof(pubsub_msg_t));
+ msg->header = msg_hdr;
+ msg->payload = (char*)serializedOutput;
+ msg->payloadSize = serializedOutputLen;
+
+
+ if(psa_udpmc_sendMsg(entry, msg,true, NULL) == false) {
+ status = -1;
+ }
+ free(msg_hdr);
+ free(msg);
+ free(serializedOutput);
+
+
+ } else {
+ printf("[PSA_UDPMC/TopicSender] No msg serializer available for msg type id %d\n", msgTypeId);
+ status=-1;
+ }
+ return status;
+}
+
+static void delay_first_send_for_late_joiners(){
+
+ static bool firstSend = true;
+
+ if(firstSend){
+ printf("PSA_UDP_MC_TP: Delaying first send for late joiners...\n");
+ sleep(FIRST_SEND_DELAY_IN_SECONDS);
+ firstSend = false;
+ }
+}
+
+static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback) {
+ const int iovec_len = 3; // header + size + payload
+ bool ret = true;
+
+ struct iovec msg_iovec[iovec_len];
+ msg_iovec[0].iov_base = msg->header;
+ msg_iovec[0].iov_len = sizeof(*msg->header);
+ msg_iovec[1].iov_base = &msg->payloadSize;
+ msg_iovec[1].iov_len = sizeof(msg->payloadSize);
+ msg_iovec[2].iov_base = msg->payload;
+ msg_iovec[2].iov_len = msg->payloadSize;
+
+ delay_first_send_for_late_joiners();
+
+ if(largeUdp_sendmsg(entry->largeUdpHandle, entry->parent->sendSocket, msg_iovec, iovec_len, 0, &entry->parent->destAddr, sizeof(entry->parent->destAddr)) == -1) {
+ perror("send_pubsub_msg:sendSocket");
+ ret = false;
+ }
+
+ if(releaseCallback) {
+ releaseCallback->release(msg->payload, entry);
+ }
+ return ret;
+}
+
+static unsigned int rand_range(unsigned int min, unsigned int max){
+ double scaled = ((double)random())/((double)RAND_MAX);
+ return (unsigned int)((max-min+1)*scaled + min);
+}
http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
index 38a9127..89e56ec 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
@@ -23,7 +23,13 @@
typedef struct pubsub_updmc_topic_sender pubsub_updmc_topic_sender_t;
-pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create(celix_bundle_context_t *ctx, /*TODO rest args*/ const char *scope, const char *topic, long serializerSvcId);
+pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create(
+ celix_bundle_context_t *ctx,
+ const char *scope,
+ const char *topic,
+ long serializerSvcId,
+ int sendSocket,
+ const char *bindIP);
void pubsub_udpmcTopicSender_destroy(pubsub_updmc_topic_sender_t *sender);
const char* pubsub_udpmcTopicSender_psaType(pubsub_updmc_topic_sender_t *sender);
@@ -31,6 +37,8 @@ const char* pubsub_udpmcTopicSender_serializerType(pubsub_updmc_topic_sender_t *
const char* pubsub_udpmcTopicSender_scope(pubsub_updmc_topic_sender_t *sender);
const char* pubsub_udpmcTopicSender_topic(pubsub_updmc_topic_sender_t *sender);
const char* pubsub_udpmcTopicSender_socketAddress(pubsub_updmc_topic_sender_t *sender);
+long pubsub_udpmcTopicSender_socketPort(pubsub_updmc_topic_sender_t *sender);
+
//TODO connections etc
void pubsub_udpmcTopicSender_connectTo(pubsub_updmc_topic_sender_t *sender, const celix_properties_t *endpoint);
http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
index eea8460..2fc80ee 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
@@ -76,7 +76,7 @@ typedef struct publish_bundle_bound_service {
typedef struct pubsub_msg{
- pubsub_msg_header_pt header;
+ pubsub_msg_header_t *header;
char* payload;
unsigned int payloadSize;
} pubsub_msg_t;
http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.c b/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.c
index 46a1688..387935e 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.c
@@ -110,7 +110,7 @@ typedef struct msg_map_entry{
static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service);
static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service);
static void* zmq_recv_thread_func(void* arg);
-static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr);
+static bool checkVersion(version_pt msgVersion,receiver hdr);
static void sigusr1_sighandler(int signo);
static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId);
static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain, void **part);
http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_discovery/src/psd_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/psd_activator.c b/bundles/pubsub/pubsub_discovery/src/psd_activator.c
index 8b2a2d4..81e467d 100644
--- a/bundles/pubsub/pubsub_discovery/src/psd_activator.c
+++ b/bundles/pubsub/pubsub_discovery/src/psd_activator.c
@@ -29,6 +29,7 @@
#include "pubsub_common.h"
#include "pubsub_listeners.h"
#include "pubsub_discovery_impl.h"
+#include "../../../shell/shell/include/command.h"
typedef struct psd_activator {
pubsub_discovery_t *pubsub_discovery;
@@ -38,6 +39,9 @@ typedef struct psd_activator {
pubsub_announce_endpoint_listener_t listenerSvc;
long listenerSvcId;
+
+ command_service_t cmdSvc;
+ long cmdSvcId;
} psd_activator_t;
static celix_status_t psd_start(psd_activator_t *act, celix_bundle_context_t *ctx) {
@@ -48,7 +52,7 @@ static celix_status_t psd_start(psd_activator_t *act, celix_bundle_context_t *ct
status = pubsub_discovery_start(act->pubsub_discovery);
celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
- opts.filter.serviceName = PUBSUB_ANNOUNCE_ENDPOINT_LISTENER_SERVICE;
+ opts.filter.serviceName = PUBSUB_DISCOVERED_ENDPOINT_LISTENER_SERVICE;
opts.callbackHandle = act->pubsub_discovery;
opts.addWithOwner = pubsub_discovery_discoveredEndpointsListenerAdded;
opts.removeWithOwner = pubsub_discovery_discoveredEndpointsListenerRemoved;
@@ -58,6 +62,18 @@ static celix_status_t psd_start(psd_activator_t *act, celix_bundle_context_t *ct
act->listenerSvc.announceEndpoint = pubsub_discovery_announceEndpoint;
act->listenerSvc.removeEndpoint = pubsub_discovery_removeEndpoint;
+ //register shell command service
+ //register shell command
+ if (status == CELIX_SUCCESS) {
+ act->cmdSvc.handle = act->pubsub_discovery;
+ act->cmdSvc.executeCommand = pubsub_discovery_executeCommand;
+ celix_properties_t *props = celix_properties_create();
+ properties_set(props, OSGI_SHELL_COMMAND_NAME, "psd_etcd");
+ properties_set(props, OSGI_SHELL_COMMAND_USAGE, "psd_etcd"); //TODO add search topic/scope option
+ properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "Overview of discovered/announced endpoints from/to ETCD");
+ act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props);
+ }
+
if (status == CELIX_SUCCESS) {
act->listenerSvcId = celix_bundleContext_registerService(ctx, &act->listenerSvc, PUBSUB_ANNOUNCE_ENDPOINT_LISTENER_SERVICE, NULL);
} else {
@@ -70,6 +86,7 @@ static celix_status_t psd_start(psd_activator_t *act, celix_bundle_context_t *ct
static celix_status_t psd_stop(psd_activator_t *act, celix_bundle_context_t *ctx) {
celix_bundleContext_stopTracker(ctx, act->publishAnnounceSvcTrackerId);
celix_bundleContext_unregisterService(ctx, act->listenerSvcId);
+ celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
celix_status_t status = pubsub_discovery_stop(act->pubsub_discovery);
pubsub_discovery_destroy(act->pubsub_discovery);
http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index 98d6be6..237f2f2 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -63,8 +63,11 @@ celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discove
celixThreadMutex_create(&(*ps_discovery)->discoveredEndpointsListenersMutex, NULL);
celixThreadMutex_create(&(*ps_discovery)->announcedEndpointsMutex, NULL);
celixThreadMutex_create(&(*ps_discovery)->discoveredEndpointsMutex, NULL);
- celixThreadMutex_create(&(*ps_discovery)->waitMutex, NULL);
- celixThreadCondition_init(&(*ps_discovery)->waitCond, NULL);
+ pthread_mutex_init(&(*ps_discovery)->waitMutex, NULL);
+ pthread_condattr_t attr;
+ pthread_condattr_init(&attr);
+ pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
+ pthread_cond_init(&(*ps_discovery)->waitCond, &attr);
celixThreadMutex_create(&(*ps_discovery)->runningMutex, NULL);
(*ps_discovery)->running = true;
@@ -102,8 +105,8 @@ celix_status_t pubsub_discovery_destroy(pubsub_discovery_t *ps_discovery) {
celixThreadMutex_unlock(&ps_discovery->announcedEndpointsMutex);
celixThreadMutex_destroy(&ps_discovery->announcedEndpointsMutex);
- celixThreadMutex_destroy(&ps_discovery->waitMutex);
- celixThreadCondition_destroy(&ps_discovery->waitCond);
+ pthread_mutex_destroy(&ps_discovery->waitMutex);
+ pthread_cond_destroy(&ps_discovery->waitCond);
celixThreadMutex_destroy(&ps_discovery->runningMutex);
@@ -246,7 +249,7 @@ void* psd_refresh(void *data) {
while (running) {
struct timespec start;
- clock_gettime(CLOCK_REALTIME, &start);
+ clock_gettime(CLOCK_MONOTONIC, &start);
celixThreadMutex_lock(&disc->announcedEndpointsMutex);
hash_map_iterator_t iter = hashMapIterator_construct(disc->announcedEndpoints);
@@ -273,9 +276,9 @@ void* psd_refresh(void *data) {
struct timespec waitTill = start;
waitTill.tv_sec += disc->sleepInsecBetweenTTLRefresh;
- celixThreadMutex_lock(&disc->waitMutex);
- pthread_cond_timedwait(&disc->waitCond, &disc->waitMutex, &waitTill); //TODO add timedwait abs for celixThread
- celixThreadMutex_unlock(&disc->waitMutex);
+ pthread_mutex_lock(&disc->waitMutex);
+ pthread_cond_timedwait(&disc->waitCond, &disc->waitMutex, &waitTill); //TODO add timedwait abs for celixThread (including MONOTONIC ..)
+ pthread_mutex_unlock(&disc->waitMutex);
celixThreadMutex_lock(&disc->runningMutex);
running = disc->running;
@@ -438,9 +441,9 @@ static void pubsub_discovery_addDiscoveredEndpoint(pubsub_discovery_t *disc, cel
assert(fwUUID != NULL);
if (fwUUID != NULL && strncmp(disc->fwUUID, fwUUID, strlen(disc->fwUUID)) == 0) {
- if (disc->verbose) {
- printf("[PSD] Ignoring endpoint %s from own framework\n", uuid);
- }
+// if (disc->verbose) {
+// printf("[PSD] Ignoring endpoint %s from own framework\n", uuid);
+// }
return;
}
@@ -553,3 +556,10 @@ static char* pubsub_discovery_createJsonEndpoint(const celix_properties_t *props
json_decref(jsEndpoint);
return str;
}
+
+celix_status_t pubsub_discovery_executeCommand(void *handle, char * commandLine __attribute__((unused)), FILE *os, FILE *errorStream __attribute__((unused))) {
+ //pubsub_discovery_t *psd = handle;
+ //TODO
+ fprintf(os, "TODO\n");
+ return CELIX_SUCCESS;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
index f0a5b22..a1af837 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
@@ -56,8 +56,9 @@ typedef struct pubsub_discovery {
celix_thread_mutex_t discoveredEndpointsListenersMutex;
hash_map_pt discoveredEndpointsListeners; //key=svcId, value=pubsub_discovered_endpoint_listener_t
- celix_thread_mutex_t waitMutex;
- celix_thread_cond_t waitCond;
+ //NOTE using pthread instead of celix mutex/cond so that condwait with abs time using a MONOTONIC clock can be used
+ pthread_mutex_t waitMutex;
+ pthread_cond_t waitCond;
celix_thread_mutex_t runningMutex;
bool running;
@@ -91,4 +92,7 @@ void pubsub_discovery_discoveredEndpointsListenerRemoved(void *handle, void *svc
celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_properties_t *endpoint);
celix_status_t pubsub_discovery_removeEndpoint(void *handle, const celix_properties_t *endpoint);
+celix_status_t pubsub_discovery_executeCommand(void *handle, char * commandLine, FILE *os, FILE *errorStream);
+
+
#endif /* PUBSUB_DISCOVERY_IMPL_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
index 8c15fcf..9157861 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
@@ -46,7 +46,7 @@ struct pubsub_admin_service {
celix_status_t (*matchPublisher)(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId);
celix_status_t (*matchSubscriber)(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId);
- celix_status_t (*matchEndpoint)(void *handle, const celix_properties_t *endpoint, double *score);
+ celix_status_t (*matchEndpoint)(void *handle, const celix_properties_t *endpoint, bool *match);
//note endpoint is owned by caller
celix_status_t (*setupTopicSender)(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint);
http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_spi/include/pubsub_common.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_common.h b/bundles/pubsub/pubsub_spi/include/pubsub_common.h
index e551031..3231400 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_common.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_common.h
@@ -40,7 +40,7 @@ struct pubsub_msg_header{
unsigned char minor;
};
-typedef struct pubsub_msg_header* pubsub_msg_header_pt;
+typedef struct pubsub_msg_header pubsub_msg_header_t;
#endif /* PUBSUB_COMMON_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
index 14f9bb0..66cc44a 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
@@ -60,13 +60,10 @@ double pubsub_utils_matchSubscriber(
double defaultScore,
long *outSerializerSvcId);
-double pubsub_utils_matchEndpoint(
+bool pubsub_utils_matchEndpoint(
celix_bundle_context_t *ctx,
const celix_properties_t *endpoint,
const char *adminType,
- double sampleScore,
- double controlScore,
- double defaultScore,
long *outSerializerSvcId);
http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
index 0b3c742..e10169f 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
@@ -85,7 +85,14 @@ static void pubsubEndpoint_setFields(celix_properties_t *ep, const char* fwUUID,
}
}
-celix_properties_t* pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, const char* pubsubType, const char* adminType, const char *serType, celix_properties_t *topic_props) {
+celix_properties_t* pubsubEndpoint_create(
+ const char* fwUUID,
+ const char* scope,
+ const char* topic,
+ const char* pubsubType,
+ const char* adminType,
+ const char *serType,
+ celix_properties_t *topic_props) {
celix_properties_t *ep = properties_create();
pubsubEndpoint_setFields(ep, fwUUID, scope, topic, pubsubType, adminType, serType, topic_props);
if (!pubsubEndpoint_isValid(ep, true, true)) {
http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c b/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
index 60f7883..c21d597 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
@@ -183,7 +183,7 @@ double pubsub_utils_matchSubscriber(
score = PUBSUB_ADMIN_NO_MATCH_SCORE; //no serializer, no match
}
- printf("Score subscriber service match for psa type %s is %f", adminType, score);
+ printf("Score subscriber service match for psa type %s is %f\n", adminType, score);
if (outSerializerSvcId != NULL) {
*outSerializerSvcId = serializerSvcId;
@@ -196,35 +196,32 @@ double pubsub_utils_matchSubscriber(
return score;
}
-double pubsub_utils_matchEndpoint(
+bool pubsub_utils_matchEndpoint(
celix_bundle_context_t *ctx,
const celix_properties_t *ep,
const char *adminType,
- double sampleScore,
- double controlScore,
- double defaultScore,
long *outSerializerSvcId) {
- const char *requested_admin = NULL;
- const char *requested_qos = NULL;
- if (ep != NULL) {
- requested_admin = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
- requested_qos = celix_properties_get(ep, PUBSUB_UTILS_QOS_ATTRIBUTE_KEY, NULL);
+ bool psaMatch = false;
+ const char *configured_admin = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
+ if (configured_admin != NULL) {
+ psaMatch = strncmp(configured_admin, adminType, strlen(adminType)) == 0;
}
- double score = getPSAScore(requested_admin, requested_qos, adminType, sampleScore, controlScore, defaultScore);
-
- const char *requested_serializer = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, NULL);
- long serializerSvcId = getPSASerializer(ctx, requested_serializer);
- if (serializerSvcId < 0) {
- score = PUBSUB_ADMIN_NO_MATCH_SCORE; //no serializer, no match
+ bool serMatch = false;
+ long serializerSvcId = -1L;
+ if (psaMatch) {
+ const char *configured_serializer = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, NULL);
+ serializerSvcId = getPSASerializer(ctx, configured_serializer);
+ serMatch = serializerSvcId >= 0;
}
- printf("Score endpoint match for psa type %s is %f", adminType, score);
+ bool match = psaMatch && serMatch;
+ printf("Match for endpoint for psa type %s is %s\n", adminType, match ? "true" : "false");
if (outSerializerSvcId != NULL) {
*outSerializerSvcId = serializerSvcId;
}
- return score;
+ return match;
}
\ No newline at end of file