You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/10/12 09:03:25 UTC
[08/34] celix git commit: CELIX-454: More PubSub refactoring. Started
creating a new skeleton for psa udpmc based on a updated pubsub spi.
CELIX-454: More PubSub refactoring. Started creating a new skeleton for psa udpmc based on a updated pubsub spi.
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/69596cfd
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/69596cfd
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/69596cfd
Branch: refs/heads/develop
Commit: 69596cfdf93502eb2f723571394ab36537ea37ee
Parents: e30a70f
Author: Pepijn Noltes <pe...@gmail.com>
Authored: Mon Sep 24 21:15:31 2018 +0200
Committer: Pepijn Noltes <pe...@gmail.com>
Committed: Mon Sep 24 21:15:31 2018 +0200
----------------------------------------------------------------------
bundles/pubsub/CMakeLists.txt | 2 +
bundles/pubsub/examples/CMakeLists.txt | 40 +-
.../private/include/pubsub_publisher_private.h | 13 +-
.../publisher/private/src/ps_pub_activator.c | 108 +-
.../publisher/private/src/pubsub_publisher.c | 14 +-
.../pubsub/pubsub_admin_udp_mc/CMakeLists.txt | 8 +-
.../pubsub_admin_udp_mc/src/psa_activator.c | 155 +--
.../pubsub_admin_udp_mc/src/pubsub_admin_impl.c | 318 +++--
.../pubsub_admin_udp_mc/src/pubsub_admin_impl.h | 37 +-
.../src/pubsub_udpmc_admin.c | 335 +++++
.../src/pubsub_udpmc_admin.h | 92 ++
.../src/pubsub_udpmc_topic_sender.c | 99 ++
.../src/pubsub_udpmc_topic_sender.h | 39 +
.../pubsub_admin_udp_mc/src/topic_publication.c | 30 +-
.../pubsub_admin_udp_mc/src/topic_publication.h | 6 +-
.../src/topic_subscription.c | 4 +-
.../src/topic_subscription.h | 4 +-
.../pubsub/pubsub_admin_zmq/src/psa_activator.c | 245 ++--
.../pubsub_admin_zmq/src/pubsub_admin_impl.c | 136 +-
.../pubsub_admin_zmq/src/pubsub_admin_impl.h | 42 +-
.../pubsub_admin_zmq/src/topic_publication.c | 12 +-
.../pubsub_admin_zmq/src/topic_publication.h | 6 +-
.../pubsub_admin_zmq/src/topic_subscription.h | 4 +-
.../src/pubsub_discovery_impl.c | 174 ++-
.../src/pubsub_discovery_impl.h | 13 +-
.../pubsub_serializer_json/CMakeLists.txt | 2 +-
.../pubsub_serializer_json/src/ps_activator.c | 108 --
.../src/ps_json_serializer_activator.c | 59 +
.../src/pubsub_serializer_impl.h | 9 +-
bundles/pubsub/pubsub_spi/CMakeLists.txt | 2 +-
.../pubsub/pubsub_spi/include/pubsub_admin.h | 53 +-
.../pubsub_spi/include/pubsub_admin_match.h | 47 -
.../pubsub/pubsub_spi/include/pubsub_common.h | 4 -
.../pubsub_spi/include/pubsub_constants.h | 4 -
.../pubsub/pubsub_spi/include/pubsub_endpoint.h | 30 +-
.../pubsub_spi/include/pubsub_serializer.h | 18 +-
.../pubsub/pubsub_spi/include/pubsub_utils.h | 47 +-
.../pubsub/pubsub_spi/src/pubsub_admin_match.c | 169 ---
bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c | 270 ++--
bundles/pubsub/pubsub_spi/src/pubsub_utils.c | 31 +
.../pubsub/pubsub_spi/src/pubsub_utils_match.c | 230 ++++
.../src/pstm_activator.c | 38 +-
.../src/pubsub_topology_manager.c | 1185 +++++++++---------
.../src/pubsub_topology_manager.h | 81 +-
libs/framework/include/celix_api.h | 24 +-
libs/framework/include/celix_bundle_context.h | 11 +
libs/framework/src/bundle_context.c | 14 +
libs/framework/src/framework.c | 2 +-
libs/utils/include/celix_properties.h | 6 +-
libs/utils/include/celix_threads.h | 7 +-
libs/utils/src/celix_threads.c | 18 +-
libs/utils/src/properties.c | 36 +-
52 files changed, 2570 insertions(+), 1871 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index e3db995..d7ae3f8 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -19,6 +19,8 @@ celix_subproject(PUBSUB "Option to build the pubsub bundles" OFF DEPS UTILS)
if (PUBSUB)
option(BUILD_PUBSUB_PSA_ZMQ "Build ZeroMQ PubSub Admin (LGPL License)" OFF)
+ set(BUILD_PUBSUB_PSA_ZMQ OFF)
+ message(WARNING "TODO enable PSA_ZMQ again, for now disabled because refactoring is needed")
if (BUILD_PUBSUB_PSA_ZMQ)
message(WARNING "Celix will now contain a dependency with a LGPL License (ZeroMQ). For more information about this, consult the pubsub/README.md file.")
option(BUILD_ZMQ_SECURITY "Build with security for ZeroMQ." OFF)
http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/CMakeLists.txt b/bundles/pubsub/examples/CMakeLists.txt
index 58fbd92..41a3406 100644
--- a/bundles/pubsub/examples/CMakeLists.txt
+++ b/bundles/pubsub/examples/CMakeLists.txt
@@ -29,8 +29,9 @@ set(PUBSUB_CONTAINER_LIBS ${JANSSON_LIBRARY} ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES}
# UDP Multicast
add_celix_container(pubsub_publisher_udp_mc
- GROUP pubsub
- BUNDLES
+ GROUP pubsub
+ BUNDLES
+ Celix::log_service
Celix::shell
Celix::shell_tui
Celix::pubsub_serializer_json
@@ -39,12 +40,17 @@ add_celix_container(pubsub_publisher_udp_mc
Celix::pubsub_admin_udp_multicast
celix_pubsub_poi_publisher
celix_pubsub_poi_publisher2
- )
+ PROPERTIES
+ PSA_UDPMC_VERBOSE=true
+ PUBSUB_ETCD_DISCOVERY_VERBOSE=true
+ PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
+)
target_link_libraries(pubsub_publisher_udp_mc PRIVATE ${PUBSUB_CONTAINER_LIBS})
-add_celix_container("pubsub_subscriber_udp_mc"
- GROUP "pubsub"
- BUNDLES
+add_celix_container(pubsub_subscriber_udp_mc
+ GROUP "pubsub"
+ BUNDLES
+ Celix::log_service
Celix::shell
Celix::shell_tui
Celix::pubsub_serializer_json
@@ -52,11 +58,18 @@ add_celix_container("pubsub_subscriber_udp_mc"
Celix::pubsub_topology_manager
Celix::pubsub_admin_udp_multicast
celix_pubsub_poi_subscriber
- )
+ PROPERTIES
+ PSA_UDPMC_VERBOSE=true
+ PUBSUB_ETCD_DISCOVERY_VERBOSE=true
+ PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
+)
target_link_libraries(pubsub_subscriber_udp_mc PRIVATE ${PUBSUB_CONTAINER_LIBS})
-add_celix_container("pubsub_subscriber2_udp_mc"
- GROUP "pubsub"
- BUNDLES
+
+
+add_celix_container(pubsub_subscriber2_udp_mc
+ GROUP "pubsub"
+ BUNDLES
+ Celix::log_service
Celix::shell
Celix::shell_tui
Celix::pubsub_serializer_json
@@ -64,8 +77,13 @@ add_celix_container("pubsub_subscriber2_udp_mc"
Celix::pubsub_topology_manager
Celix::pubsub_admin_udp_multicast
celix_pubsub_poi_subscriber
- )
+ PROPERTIES
+ PSA_UDPMC_VERBOSE=true
+ PUBSUB_ETCD_DISCOVERY_VERBOSE=true
+ PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
+)
target_link_libraries(pubsub_subscriber2_udp_mc PRIVATE ${PUBSUB_CONTAINER_LIBS})
+
if (ETCD_CMD AND XTERM_CMD)
#Runtime starting a publish and subscriber for udp mc
add_celix_runtime(pubsub_rt_upd_mc
http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h b/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h
index 56ec678..c27ad38 100644
--- a/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h
+++ b/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h
@@ -16,18 +16,11 @@
*specific language governing permissions and limitations
*under the License.
*/
-/*
- * pubsub_publisher_private.h
- *
- * \date Sep 21, 2010
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
#ifndef PUBSUB_PUBLISHER_PRIVATE_H_
#define PUBSUB_PUBLISHER_PRIVATE_H_
-#include <celixbool.h>
+#include "celix_api.h"
#include <pthread.h>
#include "pubsub/publisher.h"
@@ -53,8 +46,8 @@ void publisher_stop(pubsub_sender_pt client);
void publisher_destroy(pubsub_sender_pt client);
-celix_status_t publisher_publishSvcAdded(void * handle, service_reference_pt reference, void * service);
-celix_status_t publisher_publishSvcRemoved(void * handle, service_reference_pt reference, void * service);
+void publisher_publishSvcAdded(void * handle, void *svc, const celix_properties_t *props);
+void publisher_publishSvcRemoved(void * handle, void *svc, const celix_properties_t *props);
#endif /* PUBSUB_PUBLISHER_PRIVATE_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c b/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
index 0da3ffc..617163c 100644
--- a/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
+++ b/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
@@ -16,21 +16,12 @@
*specific language governing permissions and limitations
*under the License.
*/
-/*
- * ps_pub_activator.c
- *
- * \date Sep 21, 2010
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
#include <sys/cdefs.h>
#include <stdlib.h>
#include <string.h>
-#include "bundle_activator.h"
-#include "service_tracker.h"
-#include "constants.h"
+#include "celix_api.h"
#include "pubsub/publisher.h"
#include "pubsub_publisher_private.h"
@@ -47,48 +38,27 @@ struct publisherActivator {
array_list_pt trackerList;//List<service_tracker_pt>
};
-celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
-
- const char* fwUUID = NULL;
-
- bundleContext_getProperty(context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
- if(fwUUID==NULL){
+static int pub_start(struct publisherActivator *act, celix_bundle_context_t *ctx) {
+ const char *fwUUID = celix_bundleContext_getProperty(ctx,OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
+ if (fwUUID==NULL) {
printf("PUBLISHER: Cannot retrieve fwUUID.\n");
return CELIX_INVALID_BUNDLE_CONTEXT;
}
- struct publisherActivator * act = malloc(sizeof(*act));
-
- bundle_pt bundle = NULL;
- long bundleId = 0;
- bundleContext_getBundle(context,&bundle);
- bundle_getBundleId(bundle,&bundleId);
- arrayList_create(&(act->trackerList));
- act->client = publisher_create(act->trackerList,fwUUID,bundleId);
- *userData = act;
+ bundle_t *bnd = celix_bundleContext_getBundle(ctx);
+ long bundleId = celix_bundle_getId(bnd);
- return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
-
- struct publisherActivator * act = (struct publisherActivator *) userData;
+ act->trackerList = celix_arrayList_create();
+ act->client = publisher_create(act->trackerList, fwUUID, bundleId);
int i;
char filter[128];
for(i=0; PUB_TOPICS[i] != NULL; i++){
const char* topic = PUB_TOPICS[i];
-
- bundle_pt bundle = NULL;
- long bundleId = 0;
- bundleContext_getBundle(context,&bundle);
- bundle_getBundleId(bundle,&bundleId);
-
- service_tracker_pt tracker = NULL;
memset(filter,0,128);
#ifdef USE_SCOPE
- char *scope;
+ char *scope;
asprintf(&scope, "my_scope_%d", i);
snprintf(filter, 128, "(&(&(%s=%s)(%s=%s))(%s=%s))",
(char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_PUBLISHER_SERVICE_NAME,
@@ -96,52 +66,36 @@ celix_status_t bundleActivator_start(void * userData, bundle_context_pt context)
PUBLISHER_SCOPE, scope);
free(scope);
#else
- snprintf(filter, 128, "(&(%s=%s)(%s=%s))", (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_PUBLISHER_SERVICE_NAME, PUBSUB_PUBLISHER_TOPIC, topic);
+ snprintf(filter, 128, "(%s=%s)", (char*) PUBSUB_PUBLISHER_TOPIC, topic);
#endif
- service_tracker_customizer_pt customizer = NULL;
- serviceTrackerCustomizer_create(act->client,NULL,publisher_publishSvcAdded,NULL,publisher_publishSvcRemoved,&customizer);
- serviceTracker_createWithFilter(context, filter, customizer, &tracker);
-
- arrayList_add(act->trackerList,tracker);
-
+ celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+ opts.callbackHandle = act->client;
+ opts.addWithProperties = publisher_publishSvcAdded;
+ opts.removeWithProperties = publisher_publishSvcRemoved;
+ opts.filter.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME;
+ opts.filter.filter = filter;
+ opts.filter.ignoreServiceLanguage = true;
+ long trackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+
+ celix_arrayList_addLong(act->trackerList,trackerId);
}
publisher_start(act->client);
- for(i=0;i<arrayList_size(act->trackerList);i++){
- service_tracker_pt tracker = arrayList_get(act->trackerList,i);
- serviceTracker_open(tracker);
- }
-
- return CELIX_SUCCESS;
+ return 0;
}
-celix_status_t bundleActivator_stop(void * userData, bundle_context_pt __attribute__((unused)) context) {
- struct publisherActivator * act = (struct publisherActivator *) userData;
- int i;
+static int pub_stop(struct publisherActivator *act, celix_bundle_context_t *ctx) {
+ for (int i=0; i < arrayList_size(act->trackerList); i++) {
+ long trkId = celix_arrayList_getLong(act->trackerList,i);
+ celix_bundleContext_stopTracker(ctx, trkId);
+ }
+ publisher_stop(act->client);
- for(i=0;i<arrayList_size(act->trackerList);i++){
- service_tracker_pt tracker = arrayList_get(act->trackerList,i);
- serviceTracker_close(tracker);
- }
- publisher_stop(act->client);
+ publisher_destroy(act->client);
+ celix_arrayList_destroy(act->trackerList);
- return CELIX_SUCCESS;
+ return 0;
}
-celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt __attribute__((unused)) context) {
- struct publisherActivator * act = (struct publisherActivator *) userData;
- int i;
-
- for(i=0;i<arrayList_size(act->trackerList);i++){
- service_tracker_pt tracker = arrayList_get(act->trackerList,i);
- serviceTracker_destroy(tracker);
- }
-
- publisher_destroy(act->client);
- arrayList_destroy(act->trackerList);
-
- free(act);
- printf("PUBLISHER: bundleActivator_destroy\n");
- return CELIX_SUCCESS;
-}
+CELIX_GEN_BUNDLE_ACTIVATOR(struct publisherActivator, pub_start, pub_stop)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
index 9a7aedc..5369a22 100644
--- a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
+++ b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
@@ -134,28 +134,25 @@ void publisher_destroy(pubsub_sender_pt client) {
free(client);
}
-celix_status_t publisher_publishSvcAdded(void * handle, service_reference_pt reference, void * service){
- pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)service;
+void publisher_publishSvcAdded(void * handle, void *svc, const celix_properties_t *props) {
+ pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)svc;
pubsub_sender_pt manager = (pubsub_sender_pt)handle;
printf("PUBLISHER: new publish service exported (%s).\n",manager->ident);
send_thread_struct_pt data = calloc(1,sizeof(struct send_thread_struct));
- const char *value = NULL;
- serviceReference_getProperty(reference, PUBSUB_PUBLISHER_TOPIC, &value);
data->service = publish_svc;
data->publisher = manager;
- data->topic = value;
+ data->topic = celix_properties_get(props, PUBSUB_PUBLISHER_TOPIC, "!ERROR!");
celix_thread_t *tid = malloc(sizeof(*tid));
stop=false;
celixThread_create(tid,NULL,send_thread,(void*)data);
hashMap_put(manager->tid_map, publish_svc, tid);
- return CELIX_SUCCESS;
}
-celix_status_t publisher_publishSvcRemoved(void * handle, service_reference_pt reference, void * service){
+void publisher_publishSvcRemoved(void * handle, void *svc, const celix_properties_t *props) {
pubsub_sender_pt manager = (pubsub_sender_pt)handle;
- celix_thread_t *tid = hashMap_get(manager->tid_map, service);
+ celix_thread_t *tid = hashMap_get(manager->tid_map, svc);
#if defined(__APPLE__) && defined(__MACH__)
uint64_t threadid;
@@ -168,5 +165,4 @@ celix_status_t publisher_publishSvcRemoved(void * handle, service_reference_pt r
stop=true;
celixThread_join(*tid,NULL);
free(tid);
- return CELIX_SUCCESS;
}
http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt b/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
index d90d605..94ce5cc 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
@@ -23,9 +23,11 @@ add_celix_bundle(celix_pubsub_admin_udp_multicast
GROUP "Celix/PubSub"
SOURCES
src/psa_activator.c
- src/pubsub_admin_impl.c
- src/topic_subscription.c
- src/topic_publication.c
+ src/pubsub_udpmc_admin.c
+ src/pubsub_udpmc_topic_sender.c
+ #src/pubsub_admin_impl.c
+ #src/topic_subscription.c
+ #src/topic_publication.c
src/large_udp.c
)
target_include_directories(celix_pubsub_admin_udp_multicast PRIVATE
http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
index cd4ee07..406720e 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
@@ -16,126 +16,83 @@
*specific language governing permissions and limitations
*under the License.
*/
-/*
- * psa_activator.c
- *
- * \date Sep 30, 2011
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-
-#include "bundle_activator.h"
-#include "service_registration.h"
-#include "service_tracker.h"
-
-#include "pubsub_admin_impl.h"
-struct activator {
- pubsub_admin_pt admin;
- pubsub_admin_service_pt adminService;
- service_registration_pt registration;
- service_tracker_pt serializerTracker;
-};
-celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
- celix_status_t status = CELIX_SUCCESS;
- struct activator *activator;
+#include <stdlib.h>
- activator = calloc(1, sizeof(*activator));
- if (!activator) {
- status = CELIX_ENOMEM;
- }
- else{
- *userData = activator;
-
- status = pubsubAdmin_create(context, &(activator->admin));
-
- if(status == CELIX_SUCCESS){
- service_tracker_customizer_pt customizer = NULL;
- status = serviceTrackerCustomizer_create(activator->admin,
- NULL,
- pubsubAdmin_serializerAdded,
- NULL,
- pubsubAdmin_serializerRemoved,
- &customizer);
- if(status == CELIX_SUCCESS){
- status = serviceTracker_create(context, PUBSUB_SERIALIZER_SERVICE, customizer, &(activator->serializerTracker));
- if(status != CELIX_SUCCESS){
- serviceTrackerCustomizer_destroy(customizer);
- pubsubAdmin_destroy(activator->admin);
- }
- }
- else{
- pubsubAdmin_destroy(activator->admin);
- }
- }
- }
+#include "celix_api.h"
+#include "pubsub_serializer.h"
+#include "log_helper.h"
- return status;
-}
+#include "pubsub_admin.h"
+#include "pubsub_udpmc_admin.h"
+#include "../../../shell/shell/include/command.h"
-celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
- celix_status_t status = CELIX_SUCCESS;
- struct activator *activator = userData;
- pubsub_admin_service_pt pubsubAdminSvc = calloc(1, sizeof(*pubsubAdminSvc));
+typedef struct psa_udpmc_activator {
+ log_helper_t *logHelper;
- if (!pubsubAdminSvc) {
- status = CELIX_ENOMEM;
- }
- else{
- pubsubAdminSvc->admin = activator->admin;
+ pubsub_udpmc_admin_t *admin;
- pubsubAdminSvc->addPublication = pubsubAdmin_addPublication;
- pubsubAdminSvc->removePublication = pubsubAdmin_removePublication;
+ pubsub_admin_service_t adminService;
+ long adminSvcId;
- pubsubAdminSvc->addSubscription = pubsubAdmin_addSubscription;
- pubsubAdminSvc->removeSubscription = pubsubAdmin_removeSubscription;
+ command_service_t cmdSvc;
+ long cmdSvcId;
+} psa_udpmc_activator_t;
- pubsubAdminSvc->closeAllPublications = pubsubAdmin_closeAllPublications;
- pubsubAdminSvc->closeAllSubscriptions = pubsubAdmin_closeAllSubscriptions;
+int psa_udpmc_start(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) {
+ act->adminSvcId = -1L;
+ act->cmdSvcId = -1L;
- pubsubAdminSvc->matchEndpoint = pubsubAdmin_matchEndpoint;
+ logHelper_create(ctx, &act->logHelper);
+ logHelper_start(act->logHelper);
- activator->adminService = pubsubAdminSvc;
+ act->admin = pubsub_udpmcAdmin_create(ctx, act->logHelper);
+ celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
- status = bundleContext_registerService(context, PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL, &activator->registration);
+ //register pubsub admin service
+ if (status == CELIX_SUCCESS) {
+ pubsub_admin_service_t *psaSvc = &act->adminService;
+ psaSvc->handle = act->admin;
+ psaSvc->matchPublisher = pubsub_udpmcAdmin_matchPublisher;
+ psaSvc->matchSubscriber = pubsub_udpmcAdmin_matchSubscriber;
+ psaSvc->matchEndpoint = pubsub_udpmcAdmin_matchEndpoint;
+ psaSvc->setupTopicSender = pubsub_udpmcAdmin_setupTopicSender;
+ psaSvc->teardownTopicSender = pubsub_udpmcAdmin_teardownTopicSender;
+ psaSvc->setupTopicReciever = pubsub_udpmcAdmin_setupTopicReciever;
+ psaSvc->teardownTopicReciever = pubsub_udpmcAdmin_teardownTopicReciever;
+ psaSvc->addEndpoint = pubsub_udpmcAdmin_addEndpoint;
+ psaSvc->removeEndpoint = pubsub_udpmcAdmin_removeEndpoint;
- status += serviceTracker_open(activator->serializerTracker);
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_UDPMC_ADMIN_TYPE);
+ act->adminSvcId = celix_bundleContext_registerService(ctx, psaSvc, PUBSUB_ADMIN_SERVICE_NAME, props);
}
+ //register shell command service
+ {
+ act->cmdSvc.handle = act->admin;
+ act->cmdSvc.executeCommand = pubsub_udpmcAdmin_executeCommand;
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, OSGI_SHELL_COMMAND_NAME, "psa_udpmc");
+ celix_properties_set(props, OSGI_SHELL_COMMAND_USAGE, "psa_udpmc");
+ celix_properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the UDPMC PSA");
+ celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props);
+ }
return status;
}
-celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
- celix_status_t status = CELIX_SUCCESS;
- struct activator *activator = userData;
-
- status += serviceTracker_close(activator->serializerTracker);
- status += serviceRegistration_unregister(activator->registration);
-
- activator->registration = NULL;
-
- free(activator->adminService);
- activator->adminService = NULL;
-
- return status;
-}
-
-celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) {
- celix_status_t status = CELIX_SUCCESS;
- struct activator *activator = userData;
-
- serviceTracker_destroy(activator->serializerTracker);
- pubsubAdmin_destroy(activator->admin);
- activator->admin = NULL;
+int psa_udpmc_stop(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) {
+ celix_bundleContext_unregisterService(ctx, act->adminSvcId);
+ celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
+ pubsub_udpmcAdmin_destroy(act->admin);
- free(activator);
+ logHelper_stop(act->logHelper);
+ logHelper_destroy(&act->logHelper);
- return status;
+ return CELIX_SUCCESS;
}
-
+CELIX_GEN_BUNDLE_ACTIVATOR(psa_udpmc_activator_t, psa_udpmc_start, psa_udpmc_stop);
http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
index 0638efb..2f62f6f 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
@@ -16,13 +16,6 @@
*specific language governing permissions and limitations
*under the License.
*/
-/*
- * pubsub_admin_impl.c
- *
- * \date Sep 30, 2011
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
#include <stdio.h>
#include <stdlib.h>
@@ -61,20 +54,20 @@
#include "topic_publication.h"
#include "pubsub_endpoint.h"
#include "pubsub/subscriber.h"
-#include "pubsub_admin_match.h"
+#include "pubsub_utils.h"
static const char *DEFAULT_MC_IP = "224.100.1.1";
static char *DEFAULT_MC_PREFIX = "224.100";
static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip);
-static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
-static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_t *admin,celix_properties_t *subEP);
+static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_t *admin,celix_properties_t *subEP);
-static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin, pubsub_endpoint_pt ep, pubsub_serializer_service_t **out, const char **serType);
-static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication);
-static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication);
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_t *admin, celix_properties_t *ep, pubsub_serializer_service_t **out, const char **serType);
+static void connectTopicPubSubToSerializer(pubsub_admin_t *admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication);
+static void disconnectTopicPubSubFromSerializer(pubsub_admin_t *admin,void *topicPubSub,bool isPublication);
-celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin) {
+celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_t **admin) {
celix_status_t status = CELIX_SUCCESS;
*admin = calloc(1, sizeof(**admin));
@@ -239,7 +232,7 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad
}
-celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin)
+celix_status_t pubsubAdmin_destroy(pubsub_admin_t *admin)
{
celix_status_t status = CELIX_SUCCESS;
@@ -325,7 +318,7 @@ celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin)
return status;
}
-static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
+static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_t *admin,celix_properties_t *subEP){
celix_status_t status = CELIX_SUCCESS;
celixThreadMutex_lock(&admin->subscriptionsLock);
@@ -342,7 +335,7 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
else{
if (admin->verbose) {
printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
- properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+ properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME));
}
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
@@ -362,9 +355,9 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
if(topic_publishers!=NULL){
for(i=0;i<arrayList_size(topic_publishers);i++){
- pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
- if(properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){
- status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
+ celix_properties_t *pubEP = arrayList_get(topic_publishers,i);
+ if(properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){
+ status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
}
}
arrayList_destroy(topic_publishers);
@@ -380,9 +373,9 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
array_list_pt ext_pub_list = (array_list_pt)hashMapIterator_nextValue(extp_iter);
if(ext_pub_list!=NULL){
for(i=0;i<arrayList_size(ext_pub_list);i++){
- pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
- if(properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){
- status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
+ celix_properties_t *pubEP = arrayList_get(ext_pub_list,i);
+ if(properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){
+ status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
}
}
}
@@ -409,10 +402,10 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
return status;
}
-celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
+celix_status_t pubsubAdmin_addSubscription(pubsub_admin_t *admin, celix_properties_t *subEP){
celix_status_t status = CELIX_SUCCESS;
- if(strcmp(properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME),PUBSUB_ANY_SUB_TOPIC)==0){
+ if(strcmp(properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME),PUBSUB_ANY_SUB_TOPIC)==0){
return pubsubAdmin_addAnySubscription(admin,subEP);
}
@@ -422,7 +415,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
celixThreadMutex_lock(&admin->localPublicationsLock);
celixThreadMutex_lock(&admin->externalPublicationsLock);
- char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+ char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME));
service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
@@ -437,11 +430,11 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
pubsub_serializer_service_t *best_serializer = NULL;
const char *serType = NULL;
if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer, &serType)) == CELIX_SUCCESS){
- status += pubsub_topicSubscriptionCreate(admin->bundle_context,admin->ifIpAddress, (char*) properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME), best_serializer, &subscription);
+ status += pubsub_topicSubscriptionCreate(admin->bundle_context,admin->ifIpAddress, (char*) properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME), best_serializer, &subscription);
} else {
if (admin->verbose) {
printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
- properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+ properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME));
}
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
@@ -450,6 +443,9 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
}
if (status==CELIX_SUCCESS){
+ //got type and serializer -> update endpoint
+ celix_properties_set(subEP, PUBSUB_ENDPOINT_ADMIN_TYPE, PSA_UDPMC_PUBSUB_ADMIN_TYPE);
+ celix_properties_set(subEP, PUBSUB_ENDPOINT_SERIALIZER, serType);
/* Try to connect internal publishers */
if(factory!=NULL){
@@ -458,9 +454,9 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
if(topic_publishers!=NULL){
for(i=0;i<arrayList_size(topic_publishers);i++){
- pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
- if(properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){
- status += pubsub_topicSubscriptionConnectPublisher(subscription, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
+ celix_properties_t *pubEP = arrayList_get(topic_publishers,i);
+ if(properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){
+ status += pubsub_topicSubscriptionConnectPublisher(subscription, (char*) properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
}
}
arrayList_destroy(topic_publishers);
@@ -471,9 +467,9 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
/* Look also for external publishers */
if(ext_pub_list!=NULL){
for(i=0;i<arrayList_size(ext_pub_list);i++){
- pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
- if(properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){
- status += pubsub_topicSubscriptionConnectPublisher(subscription, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
+ celix_properties_t *pubEP = arrayList_get(ext_pub_list,i);
+ if(properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){
+ status += pubsub_topicSubscriptionConnectPublisher(subscription, (char*) properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
}
}
}
@@ -505,38 +501,38 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
if (admin->verbose) {
printf("[PSA_UDPMC] Added subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
- properties_get(subEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
- properties_get(subEP->properties, PUBSUB_ENDPOINT_UUID),
- properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
- properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+ properties_get(subEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+ properties_get(subEP, PUBSUB_ENDPOINT_UUID),
+ properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+ properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME));
printf("[PSA_UDPMC] \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
- properties_get(subEP->properties, PUBSUB_ADMIN_TYPE_KEY),
- properties_get(subEP->properties, PUBSUB_SERIALIZER_TYPE_KEY),
- properties_get(subEP->properties, PUBSUB_ENDPOINT_TYPE));
- printf("[PSA_UDPMC] \t [endpoint socket address = %s]\n", properties_get(subEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
+ properties_get(subEP, PUBSUB_ADMIN_TYPE_KEY),
+ properties_get(subEP, PUBSUB_SERIALIZER_TYPE_KEY),
+ properties_get(subEP, PUBSUB_ENDPOINT_TYPE));
+ printf("[PSA_UDPMC] \t [endpoint socket address = %s]\n", properties_get(subEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
}
return status;
}
-celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
+celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_t *admin,celix_properties_t *subEP){
celix_status_t status = CELIX_SUCCESS;
if (admin->verbose) {
printf("[PSA_UDPMC] Removing subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
- properties_get(subEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
- properties_get(subEP->properties, PUBSUB_ENDPOINT_UUID),
- properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
- properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+ properties_get(subEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+ properties_get(subEP, PUBSUB_ENDPOINT_UUID),
+ properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+ properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME));
printf("[PSA_UDPMC] \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
- properties_get(subEP->properties, PUBSUB_ADMIN_TYPE_KEY),
- properties_get(subEP->properties, PUBSUB_SERIALIZER_TYPE_KEY),
- properties_get(subEP->properties, PUBSUB_ENDPOINT_TYPE));
- printf("[PSA_UDPMC] \t [endpoint url = %s]\n", properties_get(subEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
+ properties_get(subEP, PUBSUB_ADMIN_TYPE_KEY),
+ properties_get(subEP, PUBSUB_SERIALIZER_TYPE_KEY),
+ properties_get(subEP, PUBSUB_ENDPOINT_TYPE));
+ printf("[PSA_UDPMC] \t [endpoint url = %s]\n", properties_get(subEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
}
- char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+ char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME));
celixThreadMutex_lock(&admin->subscriptionsLock);
topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
@@ -565,7 +561,7 @@ celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpo
}
-celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){
+celix_status_t pubsubAdmin_addPublication(pubsub_admin_t *admin,celix_properties_t *pubEP){
celix_status_t status = CELIX_SUCCESS;
const char* fwUUID = NULL;
@@ -575,22 +571,22 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
return CELIX_INVALID_BUNDLE_CONTEXT;
}
- const char *epFwUUID = properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID);
+ const char *epFwUUID = properties_get(pubEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID);
bool isOwn = strncmp(fwUUID, epFwUUID, 128) == 0;
if (isOwn) {
//should be null, willl be set in this call
- assert(properties_get(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY) == NULL);
- assert(properties_get(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY) == NULL);
+ assert(properties_get(pubEP, PUBSUB_ADMIN_TYPE_KEY) == NULL);
+ assert(properties_get(pubEP, PUBSUB_SERIALIZER_TYPE_KEY) == NULL);
}
if (isOwn) {
- properties_set(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY, PSA_UDPMC_PUBSUB_ADMIN_TYPE);
+ properties_set(pubEP, PUBSUB_ADMIN_TYPE_KEY, PSA_UDPMC_PUBSUB_ADMIN_TYPE);
}
- char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+ char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_NAME));
- if ((strcmp(properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) && (properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) == NULL)) {
+ if ((strcmp(properties_get(pubEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) && (properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) == NULL)) {
celixThreadMutex_lock(&admin->localPublicationsLock);
@@ -603,11 +599,11 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer, &serType)) == CELIX_SUCCESS){
status = pubsub_topicPublicationCreate(admin->sendSocket, pubEP, best_serializer, serType, admin->mcIpAddress, &pub);
if (isOwn) {
- properties_set(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY, serType);
+ properties_set(pubEP, PUBSUB_SERIALIZER_TYPE_KEY, serType);
}
} else {
printf("PSA_UDP_MC: Cannot find a serializer for publishing topic %s. Adding it to pending list.\n",
- properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+ properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_NAME));
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
arrayList_add(admin->noSerializerPublications,pubEP);
@@ -622,8 +618,8 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
}
} else {
printf("PSA_UDP_MC: Cannot create a topicPublication for scope=%s, topic=%s.\n",
- properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
- properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+ properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+ properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_NAME));
}
} else {
//just add the new EP to the list
@@ -655,7 +651,7 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
array_list_pt pendingSubList = (array_list_pt) hashMapEntry_getValue(pendingSub);
int i;
for (i = 0; i < arrayList_size(pendingSubList); i++) {
- pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) arrayList_get(pendingSubList, i);
+ celix_properties_t *subEP = arrayList_get(pendingSubList, i);
pubsubAdmin_addSubscription(admin, subEP);
}
hashMap_remove(admin->pendingSubscriptions, scope_topic);
@@ -670,14 +666,14 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
celixThreadMutex_lock(&admin->subscriptionsLock);
topic_subscription_pt sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, scope_topic);
- if (sub != NULL && properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) != NULL) {
- pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
+ if (sub != NULL && properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) != NULL) {
+ pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, (char*) properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
}
/* And check also for ANY subscription */
topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
- if (any_sub != NULL && properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) != NULL) {
- pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
+ if (any_sub != NULL && properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) != NULL) {
+ pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, (char*) properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
}
free(scope_topic);
@@ -686,16 +682,16 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
if (admin->verbose) {
printf("PSA_UDPMC: Added publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
- properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
- properties_get(pubEP->properties, PUBSUB_ENDPOINT_UUID),
- properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
- properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+ properties_get(pubEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+ properties_get(pubEP, PUBSUB_ENDPOINT_UUID),
+ properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+ properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_NAME));
printf("PSA_UDPMC: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
- properties_get(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY),
- properties_get(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY),
- properties_get(pubEP->properties, PUBSUB_ENDPOINT_TYPE));
+ properties_get(pubEP, PUBSUB_ADMIN_TYPE_KEY),
+ properties_get(pubEP, PUBSUB_SERIALIZER_TYPE_KEY),
+ properties_get(pubEP, PUBSUB_ENDPOINT_TYPE));
printf("PSA_UDPMC: \t [endpoint url = %s, own = %i]\n",
- properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY),
+ properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY),
isOwn);
}
@@ -703,21 +699,21 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
}
-celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){
+celix_status_t pubsubAdmin_removePublication(pubsub_admin_t *admin,celix_properties_t *pubEP){
celix_status_t status = CELIX_SUCCESS;
int count = 0;
if (admin->verbose) {
- printf("PSA_UDPMC: Adding publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
- properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
- properties_get(pubEP->properties, PUBSUB_ENDPOINT_UUID),
- properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
- properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+ printf("PSA_UDPMC: Remove publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+ properties_get(pubEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+ properties_get(pubEP, PUBSUB_ENDPOINT_UUID),
+ properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+ properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_NAME));
printf("PSA_UDPMC: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
- properties_get(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY),
- properties_get(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY),
- properties_get(pubEP->properties, PUBSUB_ENDPOINT_TYPE));
- printf("PSA_UDPMC: \t [endpoint url = %s]\n", properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
+ properties_get(pubEP, PUBSUB_ADMIN_TYPE_KEY),
+ properties_get(pubEP, PUBSUB_SERIALIZER_TYPE_KEY),
+ properties_get(pubEP, PUBSUB_ENDPOINT_TYPE));
+ printf("PSA_UDPMC: \t [endpoint url = %s]\n", properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
}
const char* fwUUID = NULL;
@@ -727,9 +723,9 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n");
return CELIX_INVALID_BUNDLE_CONTEXT;
}
- char *scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+ char *scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_NAME));
- if(strcmp(properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){
+ if(strcmp(properties_get(pubEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){
celixThreadMutex_lock(&admin->localPublicationsLock);
service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
@@ -757,7 +753,7 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
int i;
bool found = false;
for(i=0;!found && i<arrayList_size(ext_pub_list);i++){
- pubsub_endpoint_pt p = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+ celix_properties_t *p = arrayList_get(ext_pub_list,i);
found = pubsubEndpoint_equals(pubEP,p);
if (found){
arrayList_remove(ext_pub_list,i);
@@ -765,8 +761,8 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
}
// Check if there are more publishers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic)
for(i=0; i<arrayList_size(ext_pub_list);i++) {
- pubsub_endpoint_pt p = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
- if (strcmp(properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY),properties_get(p->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)) == 0) {
+ celix_properties_t *p = arrayList_get(ext_pub_list,i);
+ if (strcmp(properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY),properties_get(p, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)) == 0) {
count++;
}
}
@@ -788,14 +784,14 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
celixThreadMutex_lock(&admin->subscriptionsLock);
topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
- if(sub!=NULL && properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL && count == 0){
- pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
+ if(sub!=NULL && properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL && count == 0){
+ pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub, (char*) properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
}
/* And check also for ANY subscription */
topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
- if(any_sub!=NULL && properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL && count == 0){
- pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
+ if(any_sub!=NULL && properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL && count == 0){
+ pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub, (char*) properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
}
free(scope_topic);
@@ -805,7 +801,7 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
}
-celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char *scope, char* topic){
+celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_t *admin,char *scope, char* topic){
celix_status_t status = CELIX_SUCCESS;
printf("PSA_UDP_MC: Closing all publications for scope=%s,topic=%s\n", scope, topic);
@@ -831,7 +827,7 @@ celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char *scop
}
-celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char *scope, char* topic){
+celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_t *admin,char *scope, char* topic){
celix_status_t status = CELIX_SUCCESS;
printf("PSA_UDP_MC: Closing all subscriptions\n");
@@ -891,10 +887,10 @@ static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip)
}
#endif
-static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
+static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_t *admin,celix_properties_t *subEP){
celix_status_t status = CELIX_SUCCESS;
- char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+ char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME));
array_list_pt pendingListPerTopic = hashMap_get(admin->pendingSubscriptions,scope_topic);
if(pendingListPerTopic==NULL){
arrayList_create(&pendingListPerTopic);
@@ -921,7 +917,7 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r
return CELIX_SERVICE_EXCEPTION;
}
- pubsub_admin_pt admin = (pubsub_admin_pt)handle;
+ pubsub_admin_t *admin = (pubsub_admin_t*)handle;
celixThreadMutex_lock(&admin->serializerListLock);
arrayList_add(admin->serializerList, reference);
celixThreadMutex_unlock(&admin->serializerListLock);
@@ -930,7 +926,7 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
for(i=0;i<arrayList_size(admin->noSerializerSubscriptions);i++){
- pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i);
+ celix_properties_t *ep = arrayList_get(admin->noSerializerSubscriptions,i);
pubsub_serializer_service_t *best_serializer = NULL;
pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, NULL);
if(best_serializer != NULL){ /* Finally we have a valid serializer! */
@@ -939,7 +935,7 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r
}
for(i=0;i<arrayList_size(admin->noSerializerPublications);i++){
- pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i);
+ celix_properties_t *ep = arrayList_get(admin->noSerializerPublications,i);
pubsub_serializer_service_t *best_serializer = NULL;
pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, NULL);
if(best_serializer != NULL){ /* Finally we have a valid serializer! */
@@ -958,7 +954,7 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r
celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service){
- pubsub_admin_pt admin = (pubsub_admin_pt)handle;
+ pubsub_admin_t *admin = (pubsub_admin_t*)handle;
int i=0, j=0;
const char *serType = NULL;
@@ -987,12 +983,12 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt
/* Get the endpoints that are going to be orphan */
array_list_pt pubList = pubsub_topicPublicationGetPublisherList(topicPub);
for(j=0;j<arrayList_size(pubList);j++){
- pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubList,j);
+ celix_properties_t *pubEP = arrayList_get(pubList,j);
/* Remove the publication */
pubsubAdmin_removePublication(admin, pubEP);
/* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */
- if(properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL){
- properties_unset(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY);
+ if(properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL){
+ properties_unset(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY);
}
/* Add the orphan endpoint to the noSerializer pending list */
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
@@ -1038,12 +1034,12 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt
/* Get the endpoints that are going to be orphan */
array_list_pt subList = pubsub_topicSubscriptionGetSubscribersList(topicSub);
for(j=0;j<arrayList_size(subList);j++){
- pubsub_endpoint_pt subEP = (pubsub_endpoint_pt)arrayList_get(subList,j);
+ celix_properties_t *subEP = arrayList_get(subList,j);
/* Remove the subscription */
pubsubAdmin_removeSubscription(admin, subEP);
/* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */
- if(properties_get(subEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL){
- properties_unset(subEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY);
+ if(properties_get(subEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL){
+ properties_unset(subEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY);
}
/* Add the orphan endpoint to the noSerializer pending list */
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
@@ -1084,24 +1080,10 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt
return CELIX_SUCCESS;
}
-celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score){
- celix_status_t status = CELIX_SUCCESS;
-
- const char *fwUuid = NULL;
- bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUuid);
- if (fwUuid == NULL) {
- return CELIX_ILLEGAL_STATE;
- }
-
- celixThreadMutex_lock(&admin->serializerListLock);
- status = pubsub_admin_match(endpoint, PSA_UDPMC_PUBSUB_ADMIN_TYPE, fwUuid, admin->qosSampleScore, admin->qosControlScore, admin->defaultScore, admin->serializerList,score);
- celixThreadMutex_unlock(&admin->serializerListLock);
-
- return status;
-}
/* This one recall the same logic as in the match function */
-static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin, pubsub_endpoint_pt ep, pubsub_serializer_service_t **out, const char **serType){
+/*
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_t *admin, celix_properties_t *ep, pubsub_serializer_service_t **out, const char **serType){
celix_status_t status = CELIX_SUCCESS;
pubsub_serializer_service_t *serSvc = NULL;
@@ -1122,9 +1104,9 @@ static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin, pubsu
*out = serSvc;
return status;
-}
+}*/
-static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication){
+static void connectTopicPubSubToSerializer(pubsub_admin_t *admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication){
celixThreadMutex_lock(&admin->usedSerializersLock);
@@ -1140,7 +1122,7 @@ static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializ
}
-static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication){
+static void disconnectTopicPubSubFromSerializer(pubsub_admin_t *admin,void *topicPubSub,bool isPublication){
celixThreadMutex_lock(&admin->usedSerializersLock);
@@ -1157,3 +1139,83 @@ static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topi
celixThreadMutex_unlock(&admin->usedSerializersLock);
}
+
+celix_status_t pubsubAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *outScore, long *outSerializerSvcId) {
+ pubsub_admin_t *admin = handle;
+ celix_status_t status = CELIX_SUCCESS;
+ celixThreadMutex_lock(&admin->serializerListLock);
+ long serializerSvcId = -1L;
+ double score = pubsub_utils_matchPublisher(admin->bundle_context, svcRequesterBndId, svcFilter->filterStr, PSA_UDPMC_PUBSUB_ADMIN_TYPE, admin->qosSampleScore, admin->qosControlScore, admin->defaultScore, admin->serializerList, &serializerSvcId);
+ celixThreadMutex_unlock(&admin->serializerListLock);
+ if (outScore != NULL) {
+ *outScore = score;
+ }
+ if (outSerializerSvcId != NULL) {
+ *outSerializerSvcId = serializerSvcId;
+ }
+ return status;
+}
+
+celix_status_t pubsubAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *outScore, long *outSerializerSvcId) {
+ pubsub_admin_t *admin = handle;
+ celix_status_t status = CELIX_SUCCESS;
+ celixThreadMutex_lock(&admin->serializerListLock);
+ long serializerSvcId = -1L;
+ double score = pubsub_utils_matchSubscriber(admin->bundle_context, svcProviderBndId, svcProperties, PSA_UDPMC_PUBSUB_ADMIN_TYPE, admin->qosSampleScore, admin->qosControlScore, admin->defaultScore, admin->serializerList, &serializerSvcId);
+ celixThreadMutex_unlock(&admin->serializerListLock);
+ if (outScore != NULL) {
+ *outScore = score;
+ }
+ if (outSerializerSvcId != NULL) {
+ *outSerializerSvcId = serializerSvcId;
+ }
+ return status;
+}
+
+celix_status_t pubsubAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, double *outScore) {
+ pubsub_admin_t *admin = handle;
+ celix_status_t status = CELIX_SUCCESS;
+ celixThreadMutex_lock(&admin->serializerListLock);
+ long serializerSvcId = -1L;
+ double score = pubsub_utils_matchEndpoint(admin->bundle_context, endpoint, PSA_UDPMC_PUBSUB_ADMIN_TYPE, admin->qosSampleScore, admin->qosControlScore, admin->defaultScore, admin->serializerList, &serializerSvcId);
+ celixThreadMutex_unlock(&admin->serializerListLock);
+ if (outScore != NULL) {
+ *outScore = score;
+ }
+ return status;
+}
+
+celix_status_t pubsubAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint) {
+ pubsub_admin_t *admin = handle;
+ celix_status_t status = CELIX_SUCCESS;
+ return status;
+}
+celix_status_t pubsubAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic) {
+ pubsub_admin_t *admin = handle;
+ celix_status_t status = CELIX_SUCCESS;
+ return status;
+}
+
+celix_status_t pubsubAdmin_setupTopicReciever(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint) {
+ pubsub_admin_t *admin = handle;
+ celix_status_t status = CELIX_SUCCESS;
+ return status;
+}
+
+celix_status_t pubsubAdmin_teardownTopicReciever(void *handle, const char *scope, const char *topic) {
+ pubsub_admin_t *admin = handle;
+ celix_status_t status = CELIX_SUCCESS;
+ return status;
+}
+
+celix_status_t pubsubAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint) {
+ pubsub_admin_t *admin = handle;
+ celix_status_t status = CELIX_SUCCESS;
+ return status;
+}
+
+celix_status_t pubsubAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint) {
+ pubsub_admin_t *admin = handle;
+ celix_status_t status = CELIX_SUCCESS;
+ return status;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
index b82e8a1..7618a6e 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
@@ -32,9 +32,10 @@
#include "log_helper.h"
+#define PUBSUB_PSA_UDPMC_PSA_TYPE "udp_mc"
#define PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY "pubsub.udpmc.socket_address"
-struct pubsub_admin {
+typedef struct pubsub_admin {
bundle_context_pt bundle_context;
log_helper_pt loghelper;
@@ -77,24 +78,34 @@ struct pubsub_admin {
double defaultScore;
bool verbose;
-};
+} pubsub_admin_t;
-celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin);
-celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin);
+celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_t **admin);
+celix_status_t pubsubAdmin_destroy(pubsub_admin_t *admin);
-celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
-celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
-celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP);
-celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP);
+void pubsubAdmin_addSerializer(void * handle, void *svc, const celix_properties_t *properties);
+void pubsubAdmin_removeSerializer(void * handle, void *svc, const celix_properties_t *properties);
-celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char* scope, char* topic);
-celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope, char* topic);
-celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt reference, void * service);
-celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service);
-celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score);
+//for the pubsub_admin_service
+
+celix_status_t pubsubAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId);
+celix_status_t pubsubAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId);
+celix_status_t pubsubAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, double *score);
+
+//note endpoint is owned by caller
+celix_status_t pubsubAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsubAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
+
+//note endpoint is owned by caller
+celix_status_t pubsubAdmin_setupTopicReciever(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsubAdmin_teardownTopicReciever(void *handle, const char *scope, const char *topic);
+
+celix_status_t pubsubAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint);
+celix_status_t pubsubAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint);
+
#endif /* PUBSUB_ADMIN_UDP_MC_IMPL_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
new file mode 100644
index 0000000..650e439
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
@@ -0,0 +1,335 @@
+
+#include <memory.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <ifaddrs.h>
+#include <pubsub_endpoint.h>
+
+#include "pubsub_utils.h"
+#include "pubsub_udpmc_admin.h"
+#include "pubsub_psa_udpmc_constants.h"
+#include "pubsub_udpmc_topic_sender.h"
+
+#define PUBSUB_UDPMC_MC_IP_DEFAULT "224.100.1.1"
+
+#define LOG_DEBUG(...) \
+ logHelper_log(psa->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define LOG_INFO(...) \
+ logHelper_log(psa->log, OSGI_LOGSERVICE_INFO, __VA_ARGS__);
+#define LOG_WARN(...) \
+ logHelper_log(psa->log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__);
+#define LOG_ERROR(...) \
+ logHelper_log(psa->log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
+
+static celix_status_t udpmc_getIpAddress(const char* interface, char** ip);
+
+pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper) {
+ pubsub_udpmc_admin_t *psa = calloc(1, sizeof(*psa));
+ psa->ctx = ctx;
+ psa->log = logHelper;
+ psa->verbose = celix_bundleContext_getPropertyAsBool(ctx, PUBSUB_UDPMC_VERBOSE_KEY, PUBSUB_UDPMC_VERBOSE_DEFAULT);
+ psa->fwUUID = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
+
+ int b0 = 0, b1 = 0, b2 = 0, b3 = 0;
+
+ char *mc_ip = NULL;
+ char *if_ip = NULL;
+ int sendSocket = -1;
+
+ const char *mcIpProp = celix_bundleContext_getProperty(ctx,PUBSUB_UDPMC_IP_KEY , NULL);
+ if(mcIpProp != NULL) {
+ mc_ip = strdup(mcIpProp);
+ }
+
+
+ const char *mc_prefix = celix_bundleContext_getProperty(ctx,
+ PUBSUB_UDPMC_MULTICAST_IP_PREFIX_KEY,
+ PUBSUB_UDPMC_MULTICAST_IP_PREFIX_DEFAULT);
+ const char *interface = celix_bundleContext_getProperty(ctx, PUBSUB_UDPMC_ITF_KEY, NULL);
+ if (udpmc_getIpAddress(interface, &if_ip) != CELIX_SUCCESS) {
+ LOG_WARN("[PSA_UDPMC] Could not retrieve IP address for interface %s", interface);
+ } else if (psa->verbose) {
+ LOG_INFO("[PSA_UDPMC] Using IP address %s", if_ip);
+ }
+
+ if(if_ip && sscanf(if_ip, "%i.%i.%i.%i", &b0, &b1, &b2, &b3) != 4) {
+ logHelper_log(psa->log, OSGI_LOGSERVICE_WARNING, "[PSA_UDPMC] Could not parse IP address %s", if_ip);
+ b2 = 1;
+ b3 = 1;
+ }
+
+ asprintf(&mc_ip, "%s.%d.%d",mc_prefix, b2, b3);
+
+ sendSocket = socket(AF_INET, SOCK_DGRAM, 0);
+ if(sendSocket == -1) {
+ LOG_ERROR("[PSA_UDPMC] Error creating socket: %s", strerror(errno));
+ } else {
+ char loop = 1;
+ int rc = setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop));
+ if(rc != 0) {
+ LOG_ERROR("[PSA_UDPMC] Error setsockopt(IP_MULTICAST_LOOP): %s", strerror(errno));
+ }
+ if (rc == 0) {
+ struct in_addr multicast_interface;
+ inet_aton(if_ip, &multicast_interface);
+ rc = setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_IF, &multicast_interface, sizeof(multicast_interface));
+ if (rc != 0) {
+ LOG_ERROR("[PSA_UDPMC] Error setsockopt(IP_MULTICAST_IF): %s", strerror(errno));
+ }
+ }
+ if (rc == 0) {
+ psa->sendSocket = sendSocket;
+ }
+ }
+
+ if (if_ip != NULL) {
+ psa->ifIpAddress = if_ip;
+ } else {
+ psa->ifIpAddress = strdup("127.0.0.1");
+
+ }
+ if (psa->verbose) {
+ LOG_INFO("[PSA_UDPMC] Using %s as interface for multicast communication", psa->ifIpAddress);
+ }
+
+
+ if (mc_ip != NULL) {
+ psa->mcIpAddress = mc_ip;
+ } else {
+ psa->mcIpAddress = strdup(PUBSUB_UDPMC_MC_IP_DEFAULT);
+ }
+ if (psa->verbose) {
+ LOG_INFO("[PSA_UDPMC] Using %s for service annunciation", psa->mcIpAddress);
+ }
+
+ psa->defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_UDPMC_DEFAULT_SCORE_KEY, PSA_UDPMC_DEFAULT_SCORE);
+ psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_UDPMC_QOS_SAMPLE_SCORE_KEY, PSA_UDPMC_DEFAULT_QOS_SAMPLE_SCORE);
+ psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_UDPMC_QOS_CONTROL_SCORE_KEY, PSA_UDPMC_DEFAULT_QOS_CONTROL_SCORE);
+
+ celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
+ psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+ celixThreadMutex_create(&psa->topicReceivers.mutex, NULL);
+ psa->topicReceivers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+ celixThreadMutex_create(&psa->connectedEndpoints.mutex, NULL);
+ psa->connectedEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+ return psa;
+}
+
+void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa) {
+ if (psa == NULL) {
+ return;
+ }
+
+ //note assuming al psa register services and service tracker are removed.
+
+ celixThreadMutex_destroy(&psa->topicSenders.mutex);
+ hashMap_destroy(psa->topicSenders.map, true, false);
+
+ celixThreadMutex_destroy(&psa->topicReceivers.mutex);
+ hashMap_destroy(psa->topicReceivers.map, true, false);
+
+ celixThreadMutex_destroy(&psa->connectedEndpoints.mutex);
+ hashMap_destroy(psa->connectedEndpoints.map, true, false);
+
+ free(psa->mcIpAddress);
+ free(psa->ifIpAddress);
+ free(psa);
+}
+
+celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *outScore, long *outSerializerSvcId) {
+ pubsub_udpmc_admin_t *psa = handle;
+ LOG_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchPublisher");
+ celix_status_t status = CELIX_SUCCESS;
+ double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_UDPMC_ADMIN_TYPE,
+ psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+ *outScore = score;
+
+ return status;
+}
+
+celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *outScore, long *outSerializerSvcId) {
+ pubsub_udpmc_admin_t *psa = handle;
+ LOG_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchSubscriber");
+ celix_status_t status = CELIX_SUCCESS;
+ double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_UDPMC_ADMIN_TYPE,
+ psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+ if (outScore != NULL) {
+ *outScore = score;
+ }
+ return status;
+}
+
+celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, double *outScore) {
+ pubsub_udpmc_admin_t *psa = handle;
+ LOG_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchEndpoint");
+ celix_status_t status = CELIX_SUCCESS;
+ double score = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_UDPMC_ADMIN_TYPE,
+ psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, NULL);
+ if (outScore != NULL) {
+ *outScore = score;
+ }
+ return status;
+}
+
+celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
+ pubsub_udpmc_admin_t *psa = handle;
+ celix_status_t status = CELIX_SUCCESS;
+
+ //1) Create TopicSender
+ //2) Store TopicSender
+ //3) Connect existing endpoints
+ //4) set outPublisherEndpoint
+
+ celix_properties_t *newEndpoint = NULL;
+
+ char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ pubsub_updmc_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key);
+ if (sender == NULL) {
+ sender = pubsub_udpmcTopicSender_create(psa->ctx, scope, topic, serializerSvcId);
+ const char *psaType = pubsub_udpmcTopicSender_psaType(sender);
+ const char *serType = pubsub_udpmcTopicSender_serializerType(sender);
+ newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
+ PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, NULL);
+ bool valid = pubsubEndpoint_isValid(newEndpoint, true, true);
+ if (!valid) {
+ LOG_ERROR("[PSA UDPMC] Error creating a valid TopicSender. Endpoints are not valid");
+ celix_properties_destroy(newEndpoint);
+ pubsub_udpmcTopicSender_destroy(sender);
+ free(key);
+ } else {
+ hashMap_put(psa->topicSenders.map, key, sender);
+ //TODO connect endpoints to sender
+ }
+ } else {
+ free(key);
+ LOG_ERROR("[PSA_UDPMC] Cannot setup already existing TopicSender for scope/topic %s/%s!", scope, topic);
+ }
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+
+ if (newEndpoint != NULL && outPublisherEndpoint != NULL) {
+ *outPublisherEndpoint = newEndpoint;
+ }
+
+ return status;
+}
+
+celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic) {
+ pubsub_udpmc_admin_t *psa = handle;
+ celix_status_t status = CELIX_SUCCESS;
+
+ //1) Find and remove TopicSender from map
+ //2) destroy topic sender
+
+ char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ hash_map_entry_t *entry = hashMap_getEntry(psa->topicSenders.map, key);
+ if (entry != NULL) {
+ char *mapKey = hashMapEntry_getKey(entry);
+ pubsub_updmc_topic_sender_t *sender = hashMap_remove(psa->topicSenders.map, key);
+ free(mapKey);
+ //TODO disconnect endpoints to sender
+ pubsub_udpmcTopicSender_destroy(sender);
+ } else {
+ LOG_ERROR("[PSA UDPMC] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic);
+ }
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+ free(key);
+
+ return status;
+}
+
+celix_status_t pubsub_udpmcAdmin_setupTopicReciever(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint) {
+ pubsub_udpmc_admin_t *psa = handle;
+ LOG_INFO("[PSA_UDPMC] pubsub_udpmcAdmin_setupTopicReciever. scope/topic: %s/%s", scope, topic);
+ celix_status_t status = CELIX_SUCCESS;
+ return status;
+}
+
+celix_status_t pubsub_udpmcAdmin_teardownTopicReciever(void *handle, const char *scope, const char *topic) {
+ pubsub_udpmc_admin_t *psa = handle;
+ LOG_INFO("[PSA_UDPMC] pubsub_udpmcAdmin_teardownTopicReciever. scope/topic: %s/%s", scope, topic);
+ celix_status_t status = CELIX_SUCCESS;
+ return status;
+}
+
+celix_status_t pubsub_udpmcAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint) {
+ pubsub_udpmc_admin_t *psa = handle;
+ LOG_INFO("[PSA_UDPMC] pubsub_udpmcAdmin_addEndpoint");
+ celix_status_t status = CELIX_SUCCESS;
+ return status;
+}
+
+celix_status_t pubsub_udpmcAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint) {
+ pubsub_udpmc_admin_t *psa = handle;
+ LOG_INFO("[PSA_UDPMC] pubsub_udpmcAdmin_removeEndpoint");
+ celix_status_t status = CELIX_SUCCESS;
+ return status;
+}
+
+celix_status_t pubsub_udpmcAdmin_executeCommand(void *handle, char *commandLine __attribute__((unused)), FILE *out, FILE *errStream __attribute__((unused))) {
+ pubsub_udpmc_admin_t *psa = handle;
+ celix_status_t status = CELIX_SUCCESS;
+
+ fprintf(out, "\nTopic Senders:\n");
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_updmc_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
+ const char *psaType = pubsub_udpmcTopicSender_psaType(sender);
+ const char *serType = pubsub_udpmcTopicSender_serializerType(sender);
+ const char *scope = pubsub_udpmcTopicSender_scope(sender);
+ const char *topic = pubsub_udpmcTopicSender_topic(sender);
+ const char *url = pubsub_udpmcTopicSender_socketAddress(sender);
+ fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
+ fprintf(out, " |- psa type = %s\n", psaType);
+ fprintf(out, " |- serializer type = %s\n", serType);
+ fprintf(out, " |- url = %s\n", url);
+ }
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+ fprintf(out, "\n");
+
+ //TODO topic receivers
+
+ return status;
+}
+
+#ifndef ANDROID
+static celix_status_t udpmc_getIpAddress(const char* interface, char** ip) {
+ celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+
+ struct ifaddrs *ifaddr, *ifa;
+ char host[NI_MAXHOST];
+
+ if (getifaddrs(&ifaddr) != -1)
+ {
+ for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa = ifa->ifa_next)
+ {
+ if (ifa->ifa_addr == NULL)
+ continue;
+
+ if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) {
+ if (interface == NULL) {
+ *ip = strdup(host);
+ status = CELIX_SUCCESS;
+ }
+ else if (strcmp(ifa->ifa_name, interface) == 0) {
+ *ip = strdup(host);
+ status = CELIX_SUCCESS;
+ }
+ }
+ }
+
+ freeifaddrs(ifaddr);
+ }
+
+ return status;
+}
+#endif
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
new file mode 100644
index 0000000..011d272
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
@@ -0,0 +1,92 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef CELIX_PUBSUB_UDPMC_ADMIN_H
+#define CELIX_PUBSUB_UDPMC_ADMIN_H
+
+#include "celix_api.h"
+#include "log_helper.h"
+
+#define PUBSUB_UDPMC_ADMIN_TYPE "udpmc"
+#define PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY "pubsub.udpmc.socket_address"
+
+#define PUBSUB_UDPMC_IP_KEY "PSA_IP"
+#define PUBSUB_UDPMC_ITF_KEY "PSA_INTERFACE"
+#define PUBSUB_UDPMC_MULTICAST_IP_PREFIX_KEY "PSA_MC_PREFIX"
+#define PUBSUB_UDPMC_VERBOSE_KEY "PSA_UDPMC_VERBOSE"
+
+#define PUBSUB_UDPMC_MULTICAST_IP_PREFIX_DEFAULT "224.100"
+#define PUBSUB_UDPMC_VERBOSE_DEFAULT "true"
+
+
+typedef struct pubsub_udpmc_admin {
+ celix_bundle_context_t *ctx;
+ log_helper_t *log;
+ char* ifIpAddress; // The local interface which is used for multicast communication
+ char* mcIpAddress; // The multicast IP address
+ int sendSocket;
+ double qosSampleScore;
+ double qosControlScore;
+ double defaultScore;
+ bool verbose;
+ const char *fwUUID;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = scope:topic key, value = pubsub_udpmc_topic_sender_t
+ } topicSenders;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = scope:topic key, value = pubsub_udpmc_topic_sender_t
+ } topicReceivers;
+
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = endpoint uuid, value = psa_udpmc_connected_endpoint_entry_t
+ } connectedEndpoints;
+
+} pubsub_udpmc_admin_t;
+
+typedef struct psa_udpmc_connected_endpoint {
+ void *sender; //if connected endpoint is subscriber. todo type
+ void *receiver; //if connected endpoint is publisher. TODO type
+ char *endpointUUID;
+} psa_udpmc_connected_endpoint_t;
+
+pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
+void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa);
+
+celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId);
+celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, double *score);
+
+celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
+
+celix_status_t pubsub_udpmcAdmin_setupTopicReciever(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_udpmcAdmin_teardownTopicReciever(void *handle, const char *scope, const char *topic);
+
+celix_status_t pubsub_udpmcAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint);
+celix_status_t pubsub_udpmcAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint);
+
+celix_status_t pubsub_udpmcAdmin_executeCommand(void *handle, char *commandLine, FILE *outStream, FILE *errStream);
+
+#endif //CELIX_PUBSUB_UDPMC_ADMIN_H
+
http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
new file mode 100644
index 0000000..5553d3b
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
@@ -0,0 +1,99 @@
+#include <pubsub_serializer.h>
+#include <stdlib.h>
+#include <memory.h>
+#include <pubsub_constants.h>
+#include "pubsub_udpmc_topic_sender.h"
+#include "pubsub_psa_udpmc_constants.h"
+
+static void pubsub_udpmcTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props);
+
+struct pubsub_updmc_topic_sender {
+ celix_bundle_context_t *ctx;
+ char *scope;
+ char *topic;
+ char *socketAddress;
+
+ long serTrackerId;
+ struct {
+ celix_thread_mutex_t mutex;
+ pubsub_serializer_service_t *svc;
+ const celix_properties_t *props;
+ } serializer;
+};
+
+pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, long serializerSvcId) {
+ pubsub_updmc_topic_sender_t *sender = calloc(1, sizeof(*sender));
+ sender->ctx = ctx;
+ sender->scope = strndup(scope, 1024 * 1024);
+ sender->topic = strndup(topic, 1024 * 1024);
+
+ celixThreadMutex_create(&sender->serializer.mutex, NULL);
+
+ char filter[64];
+ snprintf(filter, 64, "(service.id=%li)", serializerSvcId);
+
+ celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+ opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
+ opts.filter.filter = filter;
+ opts.filter.ignoreServiceLanguage = true;
+ opts.callbackHandle = sender;
+ opts.setWithProperties = pubsub_udpmcTopicSender_setSerializer;
+ sender->serTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+
+ return sender;
+}
+
+void pubsub_udpmcTopicSender_destroy(pubsub_updmc_topic_sender_t *sender) {
+ if (sender != NULL) {
+ celix_bundleContext_stopTracker(sender->ctx, sender->serTrackerId);
+
+ celixThreadMutex_destroy(&sender->serializer.mutex);
+
+ free(sender->scope);
+ free(sender->topic);
+ free(sender);
+ }
+}
+
+const char* pubsub_udpmcTopicSender_psaType(pubsub_updmc_topic_sender_t *sender __attribute__((unused))) {
+ return PSA_UDPMC_PUBSUB_ADMIN_TYPE;
+}
+
+const char* pubsub_udpmcTopicSender_serializerType(pubsub_updmc_topic_sender_t *sender) {
+ const char *result = NULL;
+ celixThreadMutex_lock(&sender->serializer.mutex);
+ if (sender->serializer.props != NULL) {
+ result = celix_properties_get(sender->serializer.props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
+ }
+ celixThreadMutex_unlock(&sender->serializer.mutex);
+ return result;
+}
+
+const char* pubsub_udpmcTopicSender_scope(pubsub_updmc_topic_sender_t *sender) {
+ return sender->scope;
+}
+
+const char* pubsub_udpmcTopicSender_topic(pubsub_updmc_topic_sender_t *sender) {
+ return sender->topic;
+}
+
+const char* pubsub_udpmcTopicSender_socketAddress(pubsub_updmc_topic_sender_t *sender) {
+ return sender->socketAddress;
+}
+
+void pubsub_udpmcTopicSender_connectTo(pubsub_updmc_topic_sender_t *sender, const celix_properties_t *endpoint) {
+ //TODO
+}
+
+void pubsub_udpmcTopicSender_disconnectFrom(pubsub_updmc_topic_sender_t *sender, const celix_properties_t *endpoint) {
+ //TODO
+}
+
+static void pubsub_udpmcTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props) {
+ pubsub_updmc_topic_sender_t *sender = handle;
+ pubsub_serializer_service_t *ser = svc;
+ celixThreadMutex_lock(&sender->serializer.mutex);
+ sender->serializer.svc = ser;
+ sender->serializer.props = props;
+ celixThreadMutex_unlock(&sender->serializer.mutex);
+}
\ No newline at end of file