You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/10/12 09:03:25 UTC

[08/34] celix git commit: CELIX-454: More PubSub refactoring. Started creating a new skeleton for psa udpmc based on a updated pubsub spi.

CELIX-454: More PubSub refactoring. Started creating a new skeleton for psa udpmc based on a updated pubsub spi.


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/69596cfd
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/69596cfd
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/69596cfd

Branch: refs/heads/develop
Commit: 69596cfdf93502eb2f723571394ab36537ea37ee
Parents: e30a70f
Author: Pepijn Noltes <pe...@gmail.com>
Authored: Mon Sep 24 21:15:31 2018 +0200
Committer: Pepijn Noltes <pe...@gmail.com>
Committed: Mon Sep 24 21:15:31 2018 +0200

----------------------------------------------------------------------
 bundles/pubsub/CMakeLists.txt                   |    2 +
 bundles/pubsub/examples/CMakeLists.txt          |   40 +-
 .../private/include/pubsub_publisher_private.h  |   13 +-
 .../publisher/private/src/ps_pub_activator.c    |  108 +-
 .../publisher/private/src/pubsub_publisher.c    |   14 +-
 .../pubsub/pubsub_admin_udp_mc/CMakeLists.txt   |    8 +-
 .../pubsub_admin_udp_mc/src/psa_activator.c     |  155 +--
 .../pubsub_admin_udp_mc/src/pubsub_admin_impl.c |  318 +++--
 .../pubsub_admin_udp_mc/src/pubsub_admin_impl.h |   37 +-
 .../src/pubsub_udpmc_admin.c                    |  335 +++++
 .../src/pubsub_udpmc_admin.h                    |   92 ++
 .../src/pubsub_udpmc_topic_sender.c             |   99 ++
 .../src/pubsub_udpmc_topic_sender.h             |   39 +
 .../pubsub_admin_udp_mc/src/topic_publication.c |   30 +-
 .../pubsub_admin_udp_mc/src/topic_publication.h |    6 +-
 .../src/topic_subscription.c                    |    4 +-
 .../src/topic_subscription.h                    |    4 +-
 .../pubsub/pubsub_admin_zmq/src/psa_activator.c |  245 ++--
 .../pubsub_admin_zmq/src/pubsub_admin_impl.c    |  136 +-
 .../pubsub_admin_zmq/src/pubsub_admin_impl.h    |   42 +-
 .../pubsub_admin_zmq/src/topic_publication.c    |   12 +-
 .../pubsub_admin_zmq/src/topic_publication.h    |    6 +-
 .../pubsub_admin_zmq/src/topic_subscription.h   |    4 +-
 .../src/pubsub_discovery_impl.c                 |  174 ++-
 .../src/pubsub_discovery_impl.h                 |   13 +-
 .../pubsub_serializer_json/CMakeLists.txt       |    2 +-
 .../pubsub_serializer_json/src/ps_activator.c   |  108 --
 .../src/ps_json_serializer_activator.c          |   59 +
 .../src/pubsub_serializer_impl.h                |    9 +-
 bundles/pubsub/pubsub_spi/CMakeLists.txt        |    2 +-
 .../pubsub/pubsub_spi/include/pubsub_admin.h    |   53 +-
 .../pubsub_spi/include/pubsub_admin_match.h     |   47 -
 .../pubsub/pubsub_spi/include/pubsub_common.h   |    4 -
 .../pubsub_spi/include/pubsub_constants.h       |    4 -
 .../pubsub/pubsub_spi/include/pubsub_endpoint.h |   30 +-
 .../pubsub_spi/include/pubsub_serializer.h      |   18 +-
 .../pubsub/pubsub_spi/include/pubsub_utils.h    |   47 +-
 .../pubsub/pubsub_spi/src/pubsub_admin_match.c  |  169 ---
 bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c |  270 ++--
 bundles/pubsub/pubsub_spi/src/pubsub_utils.c    |   31 +
 .../pubsub/pubsub_spi/src/pubsub_utils_match.c  |  230 ++++
 .../src/pstm_activator.c                        |   38 +-
 .../src/pubsub_topology_manager.c               | 1185 +++++++++---------
 .../src/pubsub_topology_manager.h               |   81 +-
 libs/framework/include/celix_api.h              |   24 +-
 libs/framework/include/celix_bundle_context.h   |   11 +
 libs/framework/src/bundle_context.c             |   14 +
 libs/framework/src/framework.c                  |    2 +-
 libs/utils/include/celix_properties.h           |    6 +-
 libs/utils/include/celix_threads.h              |    7 +-
 libs/utils/src/celix_threads.c                  |   18 +-
 libs/utils/src/properties.c                     |   36 +-
 52 files changed, 2570 insertions(+), 1871 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index e3db995..d7ae3f8 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -19,6 +19,8 @@ celix_subproject(PUBSUB "Option to build the pubsub bundles" OFF DEPS UTILS)
 if (PUBSUB)
 
     option(BUILD_PUBSUB_PSA_ZMQ "Build ZeroMQ PubSub Admin (LGPL License)" OFF)
+	set(BUILD_PUBSUB_PSA_ZMQ OFF)
+	message(WARNING "TODO enable PSA_ZMQ again, for now disabled because refactoring is needed")
     if (BUILD_PUBSUB_PSA_ZMQ)
 		message(WARNING "Celix will now contain a dependency with a LGPL License (ZeroMQ). For more information about this, consult the pubsub/README.md file.")
 		option(BUILD_ZMQ_SECURITY "Build with security for ZeroMQ." OFF)

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/CMakeLists.txt b/bundles/pubsub/examples/CMakeLists.txt
index 58fbd92..41a3406 100644
--- a/bundles/pubsub/examples/CMakeLists.txt
+++ b/bundles/pubsub/examples/CMakeLists.txt
@@ -29,8 +29,9 @@ set(PUBSUB_CONTAINER_LIBS ${JANSSON_LIBRARY} ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES}
 
 # UDP Multicast
 add_celix_container(pubsub_publisher_udp_mc
-        GROUP pubsub
-        BUNDLES
+    GROUP pubsub
+    BUNDLES
+        Celix::log_service
         Celix::shell
         Celix::shell_tui
         Celix::pubsub_serializer_json
@@ -39,12 +40,17 @@ add_celix_container(pubsub_publisher_udp_mc
         Celix::pubsub_admin_udp_multicast
         celix_pubsub_poi_publisher
         celix_pubsub_poi_publisher2
-        )
+    PROPERTIES
+        PSA_UDPMC_VERBOSE=true
+        PUBSUB_ETCD_DISCOVERY_VERBOSE=true
+        PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
+)
 target_link_libraries(pubsub_publisher_udp_mc PRIVATE ${PUBSUB_CONTAINER_LIBS})
 
-add_celix_container("pubsub_subscriber_udp_mc"
-        GROUP "pubsub"
-        BUNDLES
+add_celix_container(pubsub_subscriber_udp_mc
+    GROUP "pubsub"
+    BUNDLES
+        Celix::log_service
         Celix::shell
         Celix::shell_tui
         Celix::pubsub_serializer_json
@@ -52,11 +58,18 @@ add_celix_container("pubsub_subscriber_udp_mc"
         Celix::pubsub_topology_manager
         Celix::pubsub_admin_udp_multicast
         celix_pubsub_poi_subscriber
-        )
+    PROPERTIES
+        PSA_UDPMC_VERBOSE=true
+        PUBSUB_ETCD_DISCOVERY_VERBOSE=true
+        PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
+)
 target_link_libraries(pubsub_subscriber_udp_mc PRIVATE ${PUBSUB_CONTAINER_LIBS})
-add_celix_container("pubsub_subscriber2_udp_mc"
-        GROUP "pubsub"
-        BUNDLES
+
+
+add_celix_container(pubsub_subscriber2_udp_mc
+    GROUP "pubsub"
+    BUNDLES
+        Celix::log_service
         Celix::shell
         Celix::shell_tui
         Celix::pubsub_serializer_json
@@ -64,8 +77,13 @@ add_celix_container("pubsub_subscriber2_udp_mc"
         Celix::pubsub_topology_manager
         Celix::pubsub_admin_udp_multicast
         celix_pubsub_poi_subscriber
-        )
+    PROPERTIES
+        PSA_UDPMC_VERBOSE=true
+        PUBSUB_ETCD_DISCOVERY_VERBOSE=true
+        PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
+)
 target_link_libraries(pubsub_subscriber2_udp_mc PRIVATE ${PUBSUB_CONTAINER_LIBS})
+
 if (ETCD_CMD AND XTERM_CMD)
     #Runtime starting a publish and subscriber for udp mc
     add_celix_runtime(pubsub_rt_upd_mc

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h b/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h
index 56ec678..c27ad38 100644
--- a/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h
+++ b/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h
@@ -16,18 +16,11 @@
  *specific language governing permissions and limitations
  *under the License.
  */
-/*
- * pubsub_publisher_private.h
- *
- *  \date       Sep 21, 2010
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
 
 #ifndef PUBSUB_PUBLISHER_PRIVATE_H_
 #define PUBSUB_PUBLISHER_PRIVATE_H_
 
-#include <celixbool.h>
+#include "celix_api.h"
 #include <pthread.h>
 #include "pubsub/publisher.h"
 
@@ -53,8 +46,8 @@ void publisher_stop(pubsub_sender_pt client);
 
 void publisher_destroy(pubsub_sender_pt client);
 
-celix_status_t publisher_publishSvcAdded(void * handle, service_reference_pt reference, void * service);
-celix_status_t publisher_publishSvcRemoved(void * handle, service_reference_pt reference, void * service);
+void publisher_publishSvcAdded(void * handle, void *svc, const celix_properties_t *props);
+void publisher_publishSvcRemoved(void * handle, void *svc, const celix_properties_t *props);
 
 
 #endif /* PUBSUB_PUBLISHER_PRIVATE_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c b/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
index 0da3ffc..617163c 100644
--- a/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
+++ b/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
@@ -16,21 +16,12 @@
  *specific language governing permissions and limitations
  *under the License.
  */
-/*
- * ps_pub_activator.c
- *
- *  \date       Sep 21, 2010
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
 
 #include <sys/cdefs.h>
 #include <stdlib.h>
 #include <string.h>
 
-#include "bundle_activator.h"
-#include "service_tracker.h"
-#include "constants.h"
+#include "celix_api.h"
 
 #include "pubsub/publisher.h"
 #include "pubsub_publisher_private.h"
@@ -47,48 +38,27 @@ struct publisherActivator {
 	array_list_pt trackerList;//List<service_tracker_pt>
 };
 
-celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
-
-	const char* fwUUID = NULL;
-
-	bundleContext_getProperty(context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
-	if(fwUUID==NULL){
+static int pub_start(struct publisherActivator *act, celix_bundle_context_t *ctx) {
+	const char *fwUUID = celix_bundleContext_getProperty(ctx,OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
+	if (fwUUID==NULL) {
 		printf("PUBLISHER: Cannot retrieve fwUUID.\n");
 		return CELIX_INVALID_BUNDLE_CONTEXT;
 	}
 
-	struct publisherActivator * act = malloc(sizeof(*act));
-
-	bundle_pt bundle = NULL;
-	long bundleId = 0;
-	bundleContext_getBundle(context,&bundle);
-	bundle_getBundleId(bundle,&bundleId);
 
-	arrayList_create(&(act->trackerList));
-	act->client = publisher_create(act->trackerList,fwUUID,bundleId);
-	*userData = act;
+	bundle_t *bnd = celix_bundleContext_getBundle(ctx);
+	long bundleId = celix_bundle_getId(bnd);
 
-	return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
-
-	struct publisherActivator * act = (struct publisherActivator *) userData;
+	act->trackerList = celix_arrayList_create();
+	act->client = publisher_create(act->trackerList, fwUUID, bundleId);
 
 	int i;
 	char filter[128];
 	for(i=0; PUB_TOPICS[i] != NULL; i++){
 		const char* topic = PUB_TOPICS[i];
-
-		bundle_pt bundle = NULL;
-		long bundleId = 0;
-		bundleContext_getBundle(context,&bundle);
-		bundle_getBundleId(bundle,&bundleId);
-
-		service_tracker_pt tracker = NULL;
 		memset(filter,0,128);
 #ifdef USE_SCOPE
-			char *scope;
+		char *scope;
 			asprintf(&scope, "my_scope_%d", i);
 			snprintf(filter, 128, "(&(&(%s=%s)(%s=%s))(%s=%s))",
 					(char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_PUBLISHER_SERVICE_NAME,
@@ -96,52 +66,36 @@ celix_status_t bundleActivator_start(void * userData, bundle_context_pt context)
 					PUBLISHER_SCOPE, scope);
 			free(scope);
 #else
-		snprintf(filter, 128, "(&(%s=%s)(%s=%s))", (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_PUBLISHER_SERVICE_NAME, PUBSUB_PUBLISHER_TOPIC, topic);
+		snprintf(filter, 128, "(%s=%s)", (char*) PUBSUB_PUBLISHER_TOPIC, topic);
 #endif
-		service_tracker_customizer_pt customizer = NULL;
-		serviceTrackerCustomizer_create(act->client,NULL,publisher_publishSvcAdded,NULL,publisher_publishSvcRemoved,&customizer);
-		serviceTracker_createWithFilter(context, filter, customizer, &tracker);
-
-		arrayList_add(act->trackerList,tracker);
-
+        celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+        opts.callbackHandle = act->client;
+        opts.addWithProperties = publisher_publishSvcAdded;
+        opts.removeWithProperties = publisher_publishSvcRemoved;
+        opts.filter.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME;
+        opts.filter.filter = filter;
+        opts.filter.ignoreServiceLanguage = true;
+        long trackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+
+        celix_arrayList_addLong(act->trackerList,trackerId);
 	}
 
 	publisher_start(act->client);
 
-	for(i=0;i<arrayList_size(act->trackerList);i++){
-		service_tracker_pt tracker = arrayList_get(act->trackerList,i);
-		serviceTracker_open(tracker);
-	}
-
-	return CELIX_SUCCESS;
+	return 0;
 }
 
-celix_status_t bundleActivator_stop(void * userData, bundle_context_pt __attribute__((unused)) context) {
-	struct publisherActivator * act = (struct publisherActivator *) userData;
-	int i;
+static int pub_stop(struct publisherActivator *act, celix_bundle_context_t *ctx) {
+    for (int i=0; i < arrayList_size(act->trackerList); i++) {
+        long trkId = celix_arrayList_getLong(act->trackerList,i);
+        celix_bundleContext_stopTracker(ctx, trkId);
+    }
+    publisher_stop(act->client);
 
-	for(i=0;i<arrayList_size(act->trackerList);i++){
-		service_tracker_pt tracker = arrayList_get(act->trackerList,i);
-		serviceTracker_close(tracker);
-	}
-	publisher_stop(act->client);
+    publisher_destroy(act->client);
+    celix_arrayList_destroy(act->trackerList);
 
-	return CELIX_SUCCESS;
+    return 0;
 }
 
-celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt  __attribute__((unused)) context) {
-	struct publisherActivator * act = (struct publisherActivator *) userData;
-	int i;
-
-	for(i=0;i<arrayList_size(act->trackerList);i++){
-		service_tracker_pt tracker = arrayList_get(act->trackerList,i);
-		serviceTracker_destroy(tracker);
-	}
-
-	publisher_destroy(act->client);
-	arrayList_destroy(act->trackerList);
-
-	free(act);
-	printf("PUBLISHER: bundleActivator_destroy\n");
-	return CELIX_SUCCESS;
-}
+CELIX_GEN_BUNDLE_ACTIVATOR(struct publisherActivator, pub_start, pub_stop)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
index 9a7aedc..5369a22 100644
--- a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
+++ b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
@@ -134,28 +134,25 @@ void publisher_destroy(pubsub_sender_pt client) {
 	free(client);
 }
 
-celix_status_t publisher_publishSvcAdded(void * handle, service_reference_pt reference, void * service){
-	pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)service;
+void publisher_publishSvcAdded(void * handle, void *svc, const celix_properties_t *props) {
+	pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)svc;
 	pubsub_sender_pt manager = (pubsub_sender_pt)handle;
 
 	printf("PUBLISHER: new publish service exported (%s).\n",manager->ident);
 
 	send_thread_struct_pt data = calloc(1,sizeof(struct send_thread_struct));
-	const char *value = NULL;
-	serviceReference_getProperty(reference, PUBSUB_PUBLISHER_TOPIC, &value);
 	data->service = publish_svc;
 	data->publisher = manager;
-	data->topic = value;
+	data->topic = celix_properties_get(props, PUBSUB_PUBLISHER_TOPIC, "!ERROR!");
 	celix_thread_t *tid = malloc(sizeof(*tid));
 	stop=false;
 	celixThread_create(tid,NULL,send_thread,(void*)data);
 	hashMap_put(manager->tid_map, publish_svc, tid);
-	return CELIX_SUCCESS;
 }
 
-celix_status_t publisher_publishSvcRemoved(void * handle, service_reference_pt reference, void * service){
+void publisher_publishSvcRemoved(void * handle, void *svc, const celix_properties_t *props) {
 	pubsub_sender_pt manager = (pubsub_sender_pt)handle;
-	celix_thread_t *tid = hashMap_get(manager->tid_map, service);
+	celix_thread_t *tid = hashMap_get(manager->tid_map, svc);
 
 #if defined(__APPLE__) && defined(__MACH__)
 	uint64_t threadid;
@@ -168,5 +165,4 @@ celix_status_t publisher_publishSvcRemoved(void * handle, service_reference_pt r
 	stop=true;
 	celixThread_join(*tid,NULL);
 	free(tid);
-	return CELIX_SUCCESS;
 }

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt b/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
index d90d605..94ce5cc 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
@@ -23,9 +23,11 @@ add_celix_bundle(celix_pubsub_admin_udp_multicast
 	GROUP "Celix/PubSub"
 	SOURCES
         src/psa_activator.c
-        src/pubsub_admin_impl.c
-        src/topic_subscription.c
-        src/topic_publication.c
+		src/pubsub_udpmc_admin.c
+		src/pubsub_udpmc_topic_sender.c
+        #src/pubsub_admin_impl.c
+        #src/topic_subscription.c
+        #src/topic_publication.c
         src/large_udp.c
 )
 target_include_directories(celix_pubsub_admin_udp_multicast PRIVATE

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
index cd4ee07..406720e 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
@@ -16,126 +16,83 @@
  *specific language governing permissions and limitations
  *under the License.
  */
-/*
- * psa_activator.c
- *
- *  \date       Sep 30, 2011
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-
-#include "bundle_activator.h"
-#include "service_registration.h"
-#include "service_tracker.h"
-
-#include "pubsub_admin_impl.h"
 
-struct activator {
-	pubsub_admin_pt admin;
-	pubsub_admin_service_pt adminService;
-	service_registration_pt registration;
-	service_tracker_pt serializerTracker;
-};
 
-celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
-	celix_status_t status = CELIX_SUCCESS;
-	struct activator *activator;
+#include <stdlib.h>
 
-	activator = calloc(1, sizeof(*activator));
-	if (!activator) {
-		status = CELIX_ENOMEM;
-	}
-	else{
-		*userData = activator;
-
-		status = pubsubAdmin_create(context, &(activator->admin));
-
-		if(status == CELIX_SUCCESS){
-			service_tracker_customizer_pt customizer = NULL;
-			status = serviceTrackerCustomizer_create(activator->admin,
-					NULL,
-					pubsubAdmin_serializerAdded,
-					NULL,
-					pubsubAdmin_serializerRemoved,
-					&customizer);
-			if(status == CELIX_SUCCESS){
-				status = serviceTracker_create(context, PUBSUB_SERIALIZER_SERVICE, customizer, &(activator->serializerTracker));
-				if(status != CELIX_SUCCESS){
-					serviceTrackerCustomizer_destroy(customizer);
-					pubsubAdmin_destroy(activator->admin);
-				}
-			}
-			else{
-				pubsubAdmin_destroy(activator->admin);
-			}
-		}
-	}
+#include "celix_api.h"
+#include "pubsub_serializer.h"
+#include "log_helper.h"
 
-	return status;
-}
+#include "pubsub_admin.h"
+#include "pubsub_udpmc_admin.h"
+#include "../../../shell/shell/include/command.h"
 
-celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
-	celix_status_t status = CELIX_SUCCESS;
-	struct activator *activator = userData;
-	pubsub_admin_service_pt pubsubAdminSvc = calloc(1, sizeof(*pubsubAdminSvc));
+typedef struct psa_udpmc_activator {
+	log_helper_t *logHelper;
 
-	if (!pubsubAdminSvc) {
-		status = CELIX_ENOMEM;
-	}
-	else{
-		pubsubAdminSvc->admin = activator->admin;
+	pubsub_udpmc_admin_t *admin;
 
-		pubsubAdminSvc->addPublication = pubsubAdmin_addPublication;
-		pubsubAdminSvc->removePublication = pubsubAdmin_removePublication;
+	pubsub_admin_service_t adminService;
+	long adminSvcId;
 
-		pubsubAdminSvc->addSubscription = pubsubAdmin_addSubscription;
-		pubsubAdminSvc->removeSubscription = pubsubAdmin_removeSubscription;
+	command_service_t cmdSvc;
+	long cmdSvcId;
+} psa_udpmc_activator_t;
 
-		pubsubAdminSvc->closeAllPublications = pubsubAdmin_closeAllPublications;
-		pubsubAdminSvc->closeAllSubscriptions = pubsubAdmin_closeAllSubscriptions;
+int psa_udpmc_start(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) {
+	act->adminSvcId = -1L;
+	act->cmdSvcId = -1L;
 
-		pubsubAdminSvc->matchEndpoint = pubsubAdmin_matchEndpoint;
+	logHelper_create(ctx, &act->logHelper);
+	logHelper_start(act->logHelper);
 
-		activator->adminService = pubsubAdminSvc;
+	act->admin = pubsub_udpmcAdmin_create(ctx, act->logHelper);
+	celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
 
-		status = bundleContext_registerService(context, PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL, &activator->registration);
+	//register pubsub admin service
+	if (status == CELIX_SUCCESS) {
+		pubsub_admin_service_t *psaSvc = &act->adminService;
+		psaSvc->handle = act->admin;
+		psaSvc->matchPublisher = pubsub_udpmcAdmin_matchPublisher;
+		psaSvc->matchSubscriber = pubsub_udpmcAdmin_matchSubscriber;
+		psaSvc->matchEndpoint = pubsub_udpmcAdmin_matchEndpoint;
+		psaSvc->setupTopicSender = pubsub_udpmcAdmin_setupTopicSender;
+		psaSvc->teardownTopicSender = pubsub_udpmcAdmin_teardownTopicSender;
+		psaSvc->setupTopicReciever = pubsub_udpmcAdmin_setupTopicReciever;
+		psaSvc->teardownTopicReciever = pubsub_udpmcAdmin_teardownTopicReciever;
+		psaSvc->addEndpoint = pubsub_udpmcAdmin_addEndpoint;
+		psaSvc->removeEndpoint = pubsub_udpmcAdmin_removeEndpoint;
 
-		status += serviceTracker_open(activator->serializerTracker);
+		celix_properties_t *props = celix_properties_create();
+		celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_UDPMC_ADMIN_TYPE);
 
+		act->adminSvcId = celix_bundleContext_registerService(ctx, psaSvc, PUBSUB_ADMIN_SERVICE_NAME, props);
 	}
 
+	//register shell command service
+	{
+		act->cmdSvc.handle = act->admin;
+		act->cmdSvc.executeCommand = pubsub_udpmcAdmin_executeCommand;
+		celix_properties_t *props = celix_properties_create();
+		celix_properties_set(props, OSGI_SHELL_COMMAND_NAME, "psa_udpmc");
+		celix_properties_set(props, OSGI_SHELL_COMMAND_USAGE, "psa_udpmc");
+		celix_properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the UDPMC PSA");
+		celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props);
+	}
 
 	return status;
 }
 
-celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
-	celix_status_t status = CELIX_SUCCESS;
-	struct activator *activator = userData;
-
-	status += serviceTracker_close(activator->serializerTracker);
-	status += serviceRegistration_unregister(activator->registration);
-
-	activator->registration = NULL;
-
-	free(activator->adminService);
-	activator->adminService = NULL;
-
-	return status;
-}
-
-celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) {
-	celix_status_t status = CELIX_SUCCESS;
-	struct activator *activator = userData;
-
-	serviceTracker_destroy(activator->serializerTracker);
-	pubsubAdmin_destroy(activator->admin);
-	activator->admin = NULL;
+int psa_udpmc_stop(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) {
+	celix_bundleContext_unregisterService(ctx, act->adminSvcId);
+	celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
+	pubsub_udpmcAdmin_destroy(act->admin);
 
-	free(activator);
+	logHelper_stop(act->logHelper);
+	logHelper_destroy(&act->logHelper);
 
-	return status;
+	return CELIX_SUCCESS;
 }
 
-
+CELIX_GEN_BUNDLE_ACTIVATOR(psa_udpmc_activator_t, psa_udpmc_start, psa_udpmc_stop);

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
index 0638efb..2f62f6f 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
@@ -16,13 +16,6 @@
  *specific language governing permissions and limitations
  *under the License.
  */
-/*
- * pubsub_admin_impl.c
- *
- *  \date       Sep 30, 2011
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
 
 #include <stdio.h>
 #include <stdlib.h>
@@ -61,20 +54,20 @@
 #include "topic_publication.h"
 #include "pubsub_endpoint.h"
 #include "pubsub/subscriber.h"
-#include "pubsub_admin_match.h"
+#include "pubsub_utils.h"
 
 static const char *DEFAULT_MC_IP = "224.100.1.1";
 static char *DEFAULT_MC_PREFIX = "224.100";
 
 static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip);
-static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
-static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_t *admin,celix_properties_t *subEP);
+static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_t *admin,celix_properties_t *subEP);
 
-static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin, pubsub_endpoint_pt ep, pubsub_serializer_service_t **out, const char **serType);
-static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication);
-static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication);
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_t *admin, celix_properties_t *ep, pubsub_serializer_service_t **out, const char **serType);
+static void connectTopicPubSubToSerializer(pubsub_admin_t *admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication);
+static void disconnectTopicPubSubFromSerializer(pubsub_admin_t *admin,void *topicPubSub,bool isPublication);
 
-celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin) {
+celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_t **admin) {
 	celix_status_t status = CELIX_SUCCESS;
 
 	*admin = calloc(1, sizeof(**admin));
@@ -239,7 +232,7 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad
 }
 
 
-celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin)
+celix_status_t pubsubAdmin_destroy(pubsub_admin_t *admin)
 {
 	celix_status_t status = CELIX_SUCCESS;
 
@@ -325,7 +318,7 @@ celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin)
 	return status;
 }
 
-static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
+static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_t *admin,celix_properties_t *subEP){
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&admin->subscriptionsLock);
@@ -342,7 +335,7 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
 		else{
 			if (admin->verbose) {
 				printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
-					   properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+					   properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME));
 			}
 
 			celixThreadMutex_lock(&admin->noSerializerPendingsLock);
@@ -362,9 +355,9 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
 
 				if(topic_publishers!=NULL){
 					for(i=0;i<arrayList_size(topic_publishers);i++){
-						pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
-						if(properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){
-							status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
+						celix_properties_t *pubEP = arrayList_get(topic_publishers,i);
+						if(properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){
+							status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
 						}
 					}
 					arrayList_destroy(topic_publishers);
@@ -380,9 +373,9 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
 				array_list_pt ext_pub_list = (array_list_pt)hashMapIterator_nextValue(extp_iter);
 				if(ext_pub_list!=NULL){
 					for(i=0;i<arrayList_size(ext_pub_list);i++){
-						pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-						if(properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){
-							status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
+						celix_properties_t *pubEP = arrayList_get(ext_pub_list,i);
+						if(properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){
+							status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
 						}
 					}
 				}
@@ -409,10 +402,10 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
 	return status;
 }
 
-celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
+celix_status_t pubsubAdmin_addSubscription(pubsub_admin_t *admin, celix_properties_t *subEP){
 	celix_status_t status = CELIX_SUCCESS;
 
-	if(strcmp(properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME),PUBSUB_ANY_SUB_TOPIC)==0){
+	if(strcmp(properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME),PUBSUB_ANY_SUB_TOPIC)==0){
 		return pubsubAdmin_addAnySubscription(admin,subEP);
 	}
 
@@ -422,7 +415,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 	celixThreadMutex_lock(&admin->localPublicationsLock);
 	celixThreadMutex_lock(&admin->externalPublicationsLock);
 
-	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME));
 
 	service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
 	array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
@@ -437,11 +430,11 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 			pubsub_serializer_service_t *best_serializer = NULL;
             const char *serType = NULL;
 			if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer, &serType)) == CELIX_SUCCESS){
-				status += pubsub_topicSubscriptionCreate(admin->bundle_context,admin->ifIpAddress, (char*) properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME), best_serializer, &subscription);
+				status += pubsub_topicSubscriptionCreate(admin->bundle_context,admin->ifIpAddress, (char*) properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME), best_serializer, &subscription);
 			} else {
 				if (admin->verbose) {
 					printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
-						   properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+						   properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME));
 				}
 
 				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
@@ -450,6 +443,9 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 			}
 
 			if (status==CELIX_SUCCESS){
+				//got type and serializer -> update endpoint
+				celix_properties_set(subEP, PUBSUB_ENDPOINT_ADMIN_TYPE, PSA_UDPMC_PUBSUB_ADMIN_TYPE);
+				celix_properties_set(subEP, PUBSUB_ENDPOINT_SERIALIZER, serType);
 
 				/* Try to connect internal publishers */
 				if(factory!=NULL){
@@ -458,9 +454,9 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 
 					if(topic_publishers!=NULL){
 						for(i=0;i<arrayList_size(topic_publishers);i++){
-							pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
-							if(properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){
-								status += pubsub_topicSubscriptionConnectPublisher(subscription, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
+							celix_properties_t *pubEP = arrayList_get(topic_publishers,i);
+							if(properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){
+								status += pubsub_topicSubscriptionConnectPublisher(subscription, (char*) properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
 							}
 						}
 						arrayList_destroy(topic_publishers);
@@ -471,9 +467,9 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 				/* Look also for external publishers */
 				if(ext_pub_list!=NULL){
 					for(i=0;i<arrayList_size(ext_pub_list);i++){
-						pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-						if(properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){
-							status += pubsub_topicSubscriptionConnectPublisher(subscription, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
+						celix_properties_t *pubEP = arrayList_get(ext_pub_list,i);
+						if(properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){
+							status += pubsub_topicSubscriptionConnectPublisher(subscription, (char*) properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
 						}
 					}
 				}
@@ -505,38 +501,38 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 
     if (admin->verbose) {
         printf("[PSA_UDPMC] Added subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
-               properties_get(subEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
-               properties_get(subEP->properties, PUBSUB_ENDPOINT_UUID),
-               properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-               properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+               properties_get(subEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+               properties_get(subEP, PUBSUB_ENDPOINT_UUID),
+               properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+               properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME));
         printf("[PSA_UDPMC] \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
-               properties_get(subEP->properties, PUBSUB_ADMIN_TYPE_KEY),
-               properties_get(subEP->properties, PUBSUB_SERIALIZER_TYPE_KEY),
-               properties_get(subEP->properties, PUBSUB_ENDPOINT_TYPE));
-        printf("[PSA_UDPMC] \t [endpoint socket address = %s]\n", properties_get(subEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
+               properties_get(subEP, PUBSUB_ADMIN_TYPE_KEY),
+               properties_get(subEP, PUBSUB_SERIALIZER_TYPE_KEY),
+               properties_get(subEP, PUBSUB_ENDPOINT_TYPE));
+        printf("[PSA_UDPMC] \t [endpoint socket address = %s]\n", properties_get(subEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
     }
 
 	return status;
 
 }
 
-celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
+celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_t *admin,celix_properties_t *subEP){
 	celix_status_t status = CELIX_SUCCESS;
 
     if (admin->verbose) {
         printf("[PSA_UDPMC] Removing subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
-               properties_get(subEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
-               properties_get(subEP->properties, PUBSUB_ENDPOINT_UUID),
-               properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-               properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+               properties_get(subEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+               properties_get(subEP, PUBSUB_ENDPOINT_UUID),
+               properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+               properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME));
         printf("[PSA_UDPMC] \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
-               properties_get(subEP->properties, PUBSUB_ADMIN_TYPE_KEY),
-               properties_get(subEP->properties, PUBSUB_SERIALIZER_TYPE_KEY),
-               properties_get(subEP->properties, PUBSUB_ENDPOINT_TYPE));
-        printf("[PSA_UDPMC] \t [endpoint url = %s]\n", properties_get(subEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
+               properties_get(subEP, PUBSUB_ADMIN_TYPE_KEY),
+               properties_get(subEP, PUBSUB_SERIALIZER_TYPE_KEY),
+               properties_get(subEP, PUBSUB_ENDPOINT_TYPE));
+        printf("[PSA_UDPMC] \t [endpoint url = %s]\n", properties_get(subEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
     }
 
-	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME));
 
 	celixThreadMutex_lock(&admin->subscriptionsLock);
 	topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
@@ -565,7 +561,7 @@ celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpo
 
 }
 
-celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){
+celix_status_t pubsubAdmin_addPublication(pubsub_admin_t *admin,celix_properties_t *pubEP){
 	celix_status_t status = CELIX_SUCCESS;
 
     const char* fwUUID = NULL;
@@ -575,22 +571,22 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
         return CELIX_INVALID_BUNDLE_CONTEXT;
     }
 
-    const char *epFwUUID = properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID);
+    const char *epFwUUID = properties_get(pubEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID);
     bool isOwn = strncmp(fwUUID, epFwUUID, 128) == 0;
 
     if (isOwn) {
         //should be null, willl be set in this call
-        assert(properties_get(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY) == NULL);
-        assert(properties_get(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY) == NULL);
+        assert(properties_get(pubEP, PUBSUB_ADMIN_TYPE_KEY) == NULL);
+        assert(properties_get(pubEP, PUBSUB_SERIALIZER_TYPE_KEY) == NULL);
     }
 
     if (isOwn) {
-        properties_set(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY, PSA_UDPMC_PUBSUB_ADMIN_TYPE);
+        properties_set(pubEP, PUBSUB_ADMIN_TYPE_KEY, PSA_UDPMC_PUBSUB_ADMIN_TYPE);
     }
 
-	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_NAME));
 
-	if ((strcmp(properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) && (properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) == NULL)) {
+	if ((strcmp(properties_get(pubEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) && (properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) == NULL)) {
 
 		celixThreadMutex_lock(&admin->localPublicationsLock);
 
@@ -603,11 +599,11 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
 			if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer, &serType)) == CELIX_SUCCESS){
 				status = pubsub_topicPublicationCreate(admin->sendSocket, pubEP, best_serializer, serType, admin->mcIpAddress, &pub);
                 if (isOwn) {
-                    properties_set(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY, serType);
+                    properties_set(pubEP, PUBSUB_SERIALIZER_TYPE_KEY, serType);
                 }
 			} else {
 				printf("PSA_UDP_MC: Cannot find a serializer for publishing topic %s. Adding it to pending list.\n",
-					   properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+					   properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_NAME));
 
 				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 				arrayList_add(admin->noSerializerPublications,pubEP);
@@ -622,8 +618,8 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
 				}
 			} else {
 				printf("PSA_UDP_MC: Cannot create a topicPublication for scope=%s, topic=%s.\n",
-					   properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-					   properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+					   properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+					   properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_NAME));
 			}
 		} else {
 			//just add the new EP to the list
@@ -655,7 +651,7 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
 		array_list_pt pendingSubList = (array_list_pt) hashMapEntry_getValue(pendingSub);
 		int i;
 		for (i = 0; i < arrayList_size(pendingSubList); i++) {
-			pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) arrayList_get(pendingSubList, i);
+			celix_properties_t *subEP = arrayList_get(pendingSubList, i);
 			pubsubAdmin_addSubscription(admin, subEP);
 		}
 		hashMap_remove(admin->pendingSubscriptions, scope_topic);
@@ -670,14 +666,14 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
 	celixThreadMutex_lock(&admin->subscriptionsLock);
 
 	topic_subscription_pt sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, scope_topic);
-	if (sub != NULL && properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) != NULL) {
-		pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
+	if (sub != NULL && properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) != NULL) {
+		pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, (char*) properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
 	}
 
 	/* And check also for ANY subscription */
 	topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
-	if (any_sub != NULL && properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) != NULL) {
-		pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
+	if (any_sub != NULL && properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) != NULL) {
+		pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, (char*) properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
 	}
 
 	free(scope_topic);
@@ -686,16 +682,16 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
 
     if (admin->verbose) {
         printf("PSA_UDPMC: Added publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
-               properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
-               properties_get(pubEP->properties, PUBSUB_ENDPOINT_UUID),
-               properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-               properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+               properties_get(pubEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+               properties_get(pubEP, PUBSUB_ENDPOINT_UUID),
+               properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+               properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_NAME));
         printf("PSA_UDPMC: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
-               properties_get(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY),
-               properties_get(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY),
-               properties_get(pubEP->properties, PUBSUB_ENDPOINT_TYPE));
+               properties_get(pubEP, PUBSUB_ADMIN_TYPE_KEY),
+               properties_get(pubEP, PUBSUB_SERIALIZER_TYPE_KEY),
+               properties_get(pubEP, PUBSUB_ENDPOINT_TYPE));
         printf("PSA_UDPMC: \t [endpoint url = %s, own = %i]\n",
-               properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY),
+               properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY),
                isOwn);
     }
 
@@ -703,21 +699,21 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
 
 }
 
-celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){
+celix_status_t pubsubAdmin_removePublication(pubsub_admin_t *admin,celix_properties_t *pubEP){
 	celix_status_t status = CELIX_SUCCESS;
 	int count = 0;
 
     if (admin->verbose) {
-        printf("PSA_UDPMC: Adding publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
-               properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
-               properties_get(pubEP->properties, PUBSUB_ENDPOINT_UUID),
-               properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-               properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+        printf("PSA_UDPMC: Remove publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+               properties_get(pubEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+               properties_get(pubEP, PUBSUB_ENDPOINT_UUID),
+               properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+               properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_NAME));
         printf("PSA_UDPMC: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
-               properties_get(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY),
-               properties_get(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY),
-               properties_get(pubEP->properties, PUBSUB_ENDPOINT_TYPE));
-        printf("PSA_UDPMC: \t [endpoint url = %s]\n", properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
+               properties_get(pubEP, PUBSUB_ADMIN_TYPE_KEY),
+               properties_get(pubEP, PUBSUB_SERIALIZER_TYPE_KEY),
+               properties_get(pubEP, PUBSUB_ENDPOINT_TYPE));
+        printf("PSA_UDPMC: \t [endpoint url = %s]\n", properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
     }
 
 	const char* fwUUID = NULL;
@@ -727,9 +723,9 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
 		printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n");
 		return CELIX_INVALID_BUNDLE_CONTEXT;
 	}
-	char *scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+	char *scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_NAME));
 
-	if(strcmp(properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){
+	if(strcmp(properties_get(pubEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){
 
 		celixThreadMutex_lock(&admin->localPublicationsLock);
 		service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
@@ -757,7 +753,7 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
 			int i;
 			bool found = false;
 			for(i=0;!found && i<arrayList_size(ext_pub_list);i++){
-				pubsub_endpoint_pt p  = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+				celix_properties_t *p  = arrayList_get(ext_pub_list,i);
 				found = pubsubEndpoint_equals(pubEP,p);
 				if (found){
 					arrayList_remove(ext_pub_list,i);
@@ -765,8 +761,8 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
 			}
 			// Check if there are more publishers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic)
 			for(i=0; i<arrayList_size(ext_pub_list);i++) {
-				pubsub_endpoint_pt p  = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-				if (strcmp(properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY),properties_get(p->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)) == 0) {
+				celix_properties_t *p  = arrayList_get(ext_pub_list,i);
+				if (strcmp(properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY),properties_get(p, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)) == 0) {
 					count++;
 				}
 			}
@@ -788,14 +784,14 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
 	celixThreadMutex_lock(&admin->subscriptionsLock);
 
 	topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
-	if(sub!=NULL && properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL && count == 0){
-		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
+	if(sub!=NULL && properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL && count == 0){
+		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub, (char*) properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
 	}
 
 	/* And check also for ANY subscription */
 	topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
-	if(any_sub!=NULL && properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL && count == 0){
-		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
+	if(any_sub!=NULL && properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL && count == 0){
+		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub, (char*) properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
 	}
 
 	free(scope_topic);
@@ -805,7 +801,7 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
 
 }
 
-celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char *scope, char* topic){
+celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_t *admin,char *scope, char* topic){
 	celix_status_t status = CELIX_SUCCESS;
 
 	printf("PSA_UDP_MC: Closing all publications for scope=%s,topic=%s\n", scope, topic);
@@ -831,7 +827,7 @@ celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char *scop
 
 }
 
-celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char *scope, char* topic){
+celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_t *admin,char *scope, char* topic){
 	celix_status_t status = CELIX_SUCCESS;
 
 	printf("PSA_UDP_MC: Closing all subscriptions\n");
@@ -891,10 +887,10 @@ static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip)
 }
 #endif
 
-static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
+static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_t *admin,celix_properties_t *subEP){
 	celix_status_t status = CELIX_SUCCESS;
 
-	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME));
 	array_list_pt pendingListPerTopic = hashMap_get(admin->pendingSubscriptions,scope_topic);
 	if(pendingListPerTopic==NULL){
 		arrayList_create(&pendingListPerTopic);
@@ -921,7 +917,7 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r
 		return CELIX_SERVICE_EXCEPTION;
 	}
 
-	pubsub_admin_pt admin = (pubsub_admin_pt)handle;
+	pubsub_admin_t *admin = (pubsub_admin_t*)handle;
 	celixThreadMutex_lock(&admin->serializerListLock);
 	arrayList_add(admin->serializerList, reference);
 	celixThreadMutex_unlock(&admin->serializerListLock);
@@ -930,7 +926,7 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r
 	celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 
 	for(i=0;i<arrayList_size(admin->noSerializerSubscriptions);i++){
-		pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i);
+		celix_properties_t *ep = arrayList_get(admin->noSerializerSubscriptions,i);
 		pubsub_serializer_service_t *best_serializer = NULL;
 		pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, NULL);
 		if(best_serializer != NULL){ /* Finally we have a valid serializer! */
@@ -939,7 +935,7 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r
 	}
 
 	for(i=0;i<arrayList_size(admin->noSerializerPublications);i++){
-		pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i);
+		celix_properties_t *ep = arrayList_get(admin->noSerializerPublications,i);
 		pubsub_serializer_service_t *best_serializer = NULL;
 		pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, NULL);
 		if(best_serializer != NULL){ /* Finally we have a valid serializer! */
@@ -958,7 +954,7 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r
 
 celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service){
 
-	pubsub_admin_pt admin = (pubsub_admin_pt)handle;
+	pubsub_admin_t *admin = (pubsub_admin_t*)handle;
 	int i=0, j=0;
 	const char *serType = NULL;
 
@@ -987,12 +983,12 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt
 			/* Get the endpoints that are going to be orphan */
 			array_list_pt pubList = pubsub_topicPublicationGetPublisherList(topicPub);
 			for(j=0;j<arrayList_size(pubList);j++){
-				pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubList,j);
+				celix_properties_t *pubEP = arrayList_get(pubList,j);
 				/* Remove the publication */
 				pubsubAdmin_removePublication(admin, pubEP);
 				/* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */
-				if(properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL){
-					properties_unset(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY);
+				if(properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL){
+					properties_unset(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY);
 				}
 				/* Add the orphan endpoint to the noSerializer pending list */
 				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
@@ -1038,12 +1034,12 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt
 			/* Get the endpoints that are going to be orphan */
 			array_list_pt subList = pubsub_topicSubscriptionGetSubscribersList(topicSub);
 			for(j=0;j<arrayList_size(subList);j++){
-				pubsub_endpoint_pt subEP = (pubsub_endpoint_pt)arrayList_get(subList,j);
+				celix_properties_t *subEP = arrayList_get(subList,j);
 				/* Remove the subscription */
 				pubsubAdmin_removeSubscription(admin, subEP);
 				/* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */
-				if(properties_get(subEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL){
-					properties_unset(subEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY);
+				if(properties_get(subEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL){
+					properties_unset(subEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY);
 				}
 				/* Add the orphan endpoint to the noSerializer pending list */
 				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
@@ -1084,24 +1080,10 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt
 	return CELIX_SUCCESS;
 }
 
-celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score){
-	celix_status_t status = CELIX_SUCCESS;
-
-    const char *fwUuid = NULL;
-    bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUuid);
-    if (fwUuid == NULL) {
-        return CELIX_ILLEGAL_STATE;
-    }
-
-	celixThreadMutex_lock(&admin->serializerListLock);
-	status = pubsub_admin_match(endpoint, PSA_UDPMC_PUBSUB_ADMIN_TYPE, fwUuid, admin->qosSampleScore, admin->qosControlScore, admin->defaultScore, admin->serializerList,score);
-	celixThreadMutex_unlock(&admin->serializerListLock);
-
-	return status;
-}
 
 /* This one recall the same logic as in the match function */
-static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin, pubsub_endpoint_pt ep, pubsub_serializer_service_t **out, const char **serType){
+/*
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_t *admin, celix_properties_t *ep, pubsub_serializer_service_t **out, const char **serType){
 	celix_status_t status = CELIX_SUCCESS;
 
 	pubsub_serializer_service_t *serSvc = NULL;
@@ -1122,9 +1104,9 @@ static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin, pubsu
 	*out = serSvc;
 
 	return status;
-}
+}*/
 
-static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication){
+static void connectTopicPubSubToSerializer(pubsub_admin_t *admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication){
 
 	celixThreadMutex_lock(&admin->usedSerializersLock);
 
@@ -1140,7 +1122,7 @@ static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializ
 
 }
 
-static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication){
+static void disconnectTopicPubSubFromSerializer(pubsub_admin_t *admin,void *topicPubSub,bool isPublication){
 
 	celixThreadMutex_lock(&admin->usedSerializersLock);
 
@@ -1157,3 +1139,83 @@ static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topi
 	celixThreadMutex_unlock(&admin->usedSerializersLock);
 
 }
+
+celix_status_t pubsubAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *outScore, long *outSerializerSvcId) {
+	pubsub_admin_t *admin = handle;
+	celix_status_t status = CELIX_SUCCESS;
+	celixThreadMutex_lock(&admin->serializerListLock);
+	long serializerSvcId = -1L;
+	double score = pubsub_utils_matchPublisher(admin->bundle_context, svcRequesterBndId, svcFilter->filterStr, PSA_UDPMC_PUBSUB_ADMIN_TYPE, admin->qosSampleScore, admin->qosControlScore, admin->defaultScore, admin->serializerList, &serializerSvcId);
+	celixThreadMutex_unlock(&admin->serializerListLock);
+	if (outScore != NULL) {
+		*outScore = score;
+	}
+	if (outSerializerSvcId != NULL) {
+		*outSerializerSvcId = serializerSvcId;
+	}
+	return status;
+}
+
+celix_status_t pubsubAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *outScore, long *outSerializerSvcId) {
+	pubsub_admin_t *admin = handle;
+	celix_status_t status = CELIX_SUCCESS;
+	celixThreadMutex_lock(&admin->serializerListLock);
+	long serializerSvcId = -1L;
+	double score = pubsub_utils_matchSubscriber(admin->bundle_context, svcProviderBndId, svcProperties, PSA_UDPMC_PUBSUB_ADMIN_TYPE, admin->qosSampleScore, admin->qosControlScore, admin->defaultScore, admin->serializerList, &serializerSvcId);
+	celixThreadMutex_unlock(&admin->serializerListLock);
+	if (outScore != NULL) {
+		*outScore = score;
+	}
+	if (outSerializerSvcId != NULL) {
+		*outSerializerSvcId = serializerSvcId;
+	}
+	return status;
+}
+
+celix_status_t pubsubAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, double *outScore) {
+	pubsub_admin_t *admin = handle;
+	celix_status_t status = CELIX_SUCCESS;
+	celixThreadMutex_lock(&admin->serializerListLock);
+	long serializerSvcId = -1L;
+	double score = pubsub_utils_matchEndpoint(admin->bundle_context, endpoint, PSA_UDPMC_PUBSUB_ADMIN_TYPE, admin->qosSampleScore, admin->qosControlScore, admin->defaultScore, admin->serializerList, &serializerSvcId);
+	celixThreadMutex_unlock(&admin->serializerListLock);
+	if (outScore != NULL) {
+		*outScore = score;
+	}
+	return status;
+}
+
+celix_status_t pubsubAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint) {
+	pubsub_admin_t *admin = handle;
+	celix_status_t status = CELIX_SUCCESS;
+	return status;
+}
+celix_status_t pubsubAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic) {
+	pubsub_admin_t *admin = handle;
+	celix_status_t status = CELIX_SUCCESS;
+	return status;
+}
+
+celix_status_t pubsubAdmin_setupTopicReciever(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint) {
+	pubsub_admin_t *admin = handle;
+	celix_status_t status = CELIX_SUCCESS;
+	return status;
+}
+
+celix_status_t pubsubAdmin_teardownTopicReciever(void *handle, const char *scope, const char *topic) {
+	pubsub_admin_t *admin = handle;
+	celix_status_t status = CELIX_SUCCESS;
+	return status;
+}
+
+celix_status_t pubsubAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint) {
+	pubsub_admin_t *admin = handle;
+	celix_status_t status = CELIX_SUCCESS;
+	return status;
+}
+
+celix_status_t pubsubAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint) {
+	pubsub_admin_t *admin = handle;
+	celix_status_t status = CELIX_SUCCESS;
+	return status;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
index b82e8a1..7618a6e 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
@@ -32,9 +32,10 @@
 #include "log_helper.h"
 
 
+#define PUBSUB_PSA_UDPMC_PSA_TYPE					"udp_mc"
 #define PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY			"pubsub.udpmc.socket_address"
 
-struct pubsub_admin {
+typedef struct pubsub_admin {
 
 	bundle_context_pt bundle_context;
 	log_helper_pt loghelper;
@@ -77,24 +78,34 @@ struct pubsub_admin {
 	double defaultScore;
 
 	bool verbose;
-};
+} pubsub_admin_t;
 
-celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin);
-celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin);
+celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_t **admin);
+celix_status_t pubsubAdmin_destroy(pubsub_admin_t *admin);
 
-celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
-celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
 
-celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP);
-celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP);
+void pubsubAdmin_addSerializer(void * handle, void *svc, const celix_properties_t *properties);
+void pubsubAdmin_removeSerializer(void * handle, void *svc, const celix_properties_t *properties);
 
-celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char* scope, char* topic);
-celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope, char* topic);
 
-celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt reference, void * service);
-celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service);
 
-celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score);
+//for the pubsub_admin_service
+
+celix_status_t pubsubAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId);
+celix_status_t pubsubAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId);
+celix_status_t pubsubAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, double *score);
+
+//note endpoint is owned by caller
+celix_status_t pubsubAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsubAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
+
+//note endpoint is owned by caller
+celix_status_t pubsubAdmin_setupTopicReciever(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsubAdmin_teardownTopicReciever(void *handle, const char *scope, const char *topic);
+
+celix_status_t pubsubAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint);
+celix_status_t pubsubAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint);
+
 
 
 #endif /* PUBSUB_ADMIN_UDP_MC_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
new file mode 100644
index 0000000..650e439
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
@@ -0,0 +1,335 @@
+
+#include <memory.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <ifaddrs.h>
+#include <pubsub_endpoint.h>
+
+#include "pubsub_utils.h"
+#include "pubsub_udpmc_admin.h"
+#include "pubsub_psa_udpmc_constants.h"
+#include "pubsub_udpmc_topic_sender.h"
+
+#define PUBSUB_UDPMC_MC_IP_DEFAULT                     "224.100.1.1"
+
+#define LOG_DEBUG(...) \
+    logHelper_log(psa->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define LOG_INFO(...) \
+    logHelper_log(psa->log, OSGI_LOGSERVICE_INFO, __VA_ARGS__);
+#define LOG_WARN(...) \
+    logHelper_log(psa->log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__);
+#define LOG_ERROR(...) \
+    logHelper_log(psa->log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
+
+static celix_status_t udpmc_getIpAddress(const char* interface, char** ip);
+
+pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper) {
+    pubsub_udpmc_admin_t *psa = calloc(1, sizeof(*psa));
+    psa->ctx = ctx;
+    psa->log = logHelper;
+    psa->verbose = celix_bundleContext_getPropertyAsBool(ctx, PUBSUB_UDPMC_VERBOSE_KEY, PUBSUB_UDPMC_VERBOSE_DEFAULT);
+    psa->fwUUID = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
+
+    int b0 = 0, b1 = 0, b2 = 0, b3 = 0;
+
+    char *mc_ip = NULL;
+    char *if_ip = NULL;
+    int sendSocket = -1;
+
+    const char *mcIpProp = celix_bundleContext_getProperty(ctx,PUBSUB_UDPMC_IP_KEY , NULL);
+    if(mcIpProp != NULL) {
+        mc_ip = strdup(mcIpProp);
+    }
+
+
+    const char *mc_prefix = celix_bundleContext_getProperty(ctx,
+            PUBSUB_UDPMC_MULTICAST_IP_PREFIX_KEY,
+            PUBSUB_UDPMC_MULTICAST_IP_PREFIX_DEFAULT);
+    const char *interface = celix_bundleContext_getProperty(ctx, PUBSUB_UDPMC_ITF_KEY, NULL);
+    if (udpmc_getIpAddress(interface, &if_ip) != CELIX_SUCCESS) {
+        LOG_WARN("[PSA_UDPMC] Could not retrieve IP address for interface %s", interface);
+    } else if (psa->verbose) {
+        LOG_INFO("[PSA_UDPMC] Using IP address %s", if_ip);
+    }
+
+    if(if_ip && sscanf(if_ip, "%i.%i.%i.%i", &b0, &b1, &b2, &b3) != 4) {
+        logHelper_log(psa->log, OSGI_LOGSERVICE_WARNING, "[PSA_UDPMC] Could not parse IP address %s", if_ip);
+        b2 = 1;
+        b3 = 1;
+    }
+
+    asprintf(&mc_ip, "%s.%d.%d",mc_prefix, b2, b3);
+
+    sendSocket = socket(AF_INET, SOCK_DGRAM, 0);
+    if(sendSocket == -1) {
+        LOG_ERROR("[PSA_UDPMC] Error creating socket: %s", strerror(errno));
+    } else {
+        char loop = 1;
+        int rc = setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop));
+        if(rc != 0) {
+            LOG_ERROR("[PSA_UDPMC] Error setsockopt(IP_MULTICAST_LOOP): %s", strerror(errno));
+        }
+        if (rc == 0) {
+            struct in_addr multicast_interface;
+            inet_aton(if_ip, &multicast_interface);
+            rc = setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_IF, &multicast_interface, sizeof(multicast_interface));
+            if (rc != 0) {
+                LOG_ERROR("[PSA_UDPMC] Error setsockopt(IP_MULTICAST_IF): %s", strerror(errno));
+            }
+        }
+        if (rc == 0) {
+            psa->sendSocket = sendSocket;
+        }
+    }
+
+    if (if_ip != NULL) {
+        psa->ifIpAddress = if_ip;
+    } else {
+        psa->ifIpAddress = strdup("127.0.0.1");
+
+    }
+    if (psa->verbose) {
+        LOG_INFO("[PSA_UDPMC] Using %s as interface for multicast communication", psa->ifIpAddress);
+    }
+
+
+    if (mc_ip != NULL) {
+        psa->mcIpAddress = mc_ip;
+    } else {
+        psa->mcIpAddress = strdup(PUBSUB_UDPMC_MC_IP_DEFAULT);
+    }
+    if (psa->verbose) {
+        LOG_INFO("[PSA_UDPMC] Using %s for service annunciation", psa->mcIpAddress);
+    }
+
+    psa->defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_UDPMC_DEFAULT_SCORE_KEY, PSA_UDPMC_DEFAULT_SCORE);
+    psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_UDPMC_QOS_SAMPLE_SCORE_KEY, PSA_UDPMC_DEFAULT_QOS_SAMPLE_SCORE);
+    psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_UDPMC_QOS_CONTROL_SCORE_KEY, PSA_UDPMC_DEFAULT_QOS_CONTROL_SCORE);
+
+    celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
+    psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+    celixThreadMutex_create(&psa->topicReceivers.mutex, NULL);
+    psa->topicReceivers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+    celixThreadMutex_create(&psa->connectedEndpoints.mutex, NULL);
+    psa->connectedEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+    return psa;
+}
+
+void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa) {
+    if (psa == NULL) {
+        return;
+    }
+
+    //note assuming al psa register services and service tracker are removed.
+
+    celixThreadMutex_destroy(&psa->topicSenders.mutex);
+    hashMap_destroy(psa->topicSenders.map, true, false);
+
+    celixThreadMutex_destroy(&psa->topicReceivers.mutex);
+    hashMap_destroy(psa->topicReceivers.map, true, false);
+
+    celixThreadMutex_destroy(&psa->connectedEndpoints.mutex);
+    hashMap_destroy(psa->connectedEndpoints.map, true, false);
+
+    free(psa->mcIpAddress);
+    free(psa->ifIpAddress);
+    free(psa);
+}
+
+celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *outScore, long *outSerializerSvcId) {
+    pubsub_udpmc_admin_t *psa = handle;
+    LOG_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchPublisher");
+    celix_status_t  status = CELIX_SUCCESS;
+    double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_UDPMC_ADMIN_TYPE,
+                                                psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+    *outScore = score;
+
+    return status;
+}
+
+celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *outScore, long *outSerializerSvcId) {
+    pubsub_udpmc_admin_t *psa = handle;
+    LOG_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchSubscriber");
+    celix_status_t  status = CELIX_SUCCESS;
+    double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_UDPMC_ADMIN_TYPE,
+            psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+    if (outScore != NULL) {
+        *outScore = score;
+    }
+    return status;
+}
+
+celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, double *outScore) {
+    pubsub_udpmc_admin_t *psa = handle;
+    LOG_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchEndpoint");
+    celix_status_t  status = CELIX_SUCCESS;
+    double score = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_UDPMC_ADMIN_TYPE,
+                                                psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, NULL);
+    if (outScore != NULL) {
+        *outScore = score;
+    }
+    return status;
+}
+
+celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
+    pubsub_udpmc_admin_t *psa = handle;
+    celix_status_t  status = CELIX_SUCCESS;
+
+    //1) Create TopicSender
+    //2) Store TopicSender
+    //3) Connect existing endpoints
+    //4) set outPublisherEndpoint
+
+    celix_properties_t *newEndpoint = NULL;
+
+    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&psa->topicSenders.mutex);
+    pubsub_updmc_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key);
+    if (sender == NULL) {
+        sender = pubsub_udpmcTopicSender_create(psa->ctx, scope, topic, serializerSvcId);
+        const char *psaType = pubsub_udpmcTopicSender_psaType(sender);
+        const char *serType = pubsub_udpmcTopicSender_serializerType(sender);
+        newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
+                PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, NULL);
+        bool valid = pubsubEndpoint_isValid(newEndpoint, true, true);
+        if (!valid) {
+            LOG_ERROR("[PSA UDPMC] Error creating a valid TopicSender. Endpoints are not valid");
+            celix_properties_destroy(newEndpoint);
+            pubsub_udpmcTopicSender_destroy(sender);
+            free(key);
+        } else {
+            hashMap_put(psa->topicSenders.map, key, sender);
+            //TODO connect endpoints to sender
+        }
+    } else {
+        free(key);
+        LOG_ERROR("[PSA_UDPMC] Cannot setup already existing TopicSender for scope/topic %s/%s!", scope, topic);
+    }
+    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+
+    if (newEndpoint != NULL && outPublisherEndpoint != NULL) {
+        *outPublisherEndpoint = newEndpoint;
+    }
+
+    return status;
+}
+
+celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic) {
+    pubsub_udpmc_admin_t *psa = handle;
+    celix_status_t  status = CELIX_SUCCESS;
+
+    //1) Find and remove TopicSender from map
+    //2) destroy topic sender
+
+    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&psa->topicSenders.mutex);
+    hash_map_entry_t *entry = hashMap_getEntry(psa->topicSenders.map, key);
+    if (entry != NULL) {
+        char *mapKey = hashMapEntry_getKey(entry);
+        pubsub_updmc_topic_sender_t *sender = hashMap_remove(psa->topicSenders.map, key);
+        free(mapKey);
+        //TODO disconnect endpoints to sender
+        pubsub_udpmcTopicSender_destroy(sender);
+    } else {
+        LOG_ERROR("[PSA UDPMC] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic);
+    }
+    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    free(key);
+
+    return status;
+}
+
+celix_status_t pubsub_udpmcAdmin_setupTopicReciever(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint) {
+    pubsub_udpmc_admin_t *psa = handle;
+    LOG_INFO("[PSA_UDPMC] pubsub_udpmcAdmin_setupTopicReciever. scope/topic: %s/%s", scope, topic);
+    celix_status_t  status = CELIX_SUCCESS;
+    return status;
+}
+
+celix_status_t pubsub_udpmcAdmin_teardownTopicReciever(void *handle, const char *scope, const char *topic) {
+    pubsub_udpmc_admin_t *psa = handle;
+    LOG_INFO("[PSA_UDPMC] pubsub_udpmcAdmin_teardownTopicReciever. scope/topic: %s/%s", scope, topic);
+    celix_status_t  status = CELIX_SUCCESS;
+    return status;
+}
+
+celix_status_t pubsub_udpmcAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint) {
+    pubsub_udpmc_admin_t *psa = handle;
+    LOG_INFO("[PSA_UDPMC] pubsub_udpmcAdmin_addEndpoint");
+    celix_status_t  status = CELIX_SUCCESS;
+    return status;
+}
+
+celix_status_t pubsub_udpmcAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint) {
+    pubsub_udpmc_admin_t *psa = handle;
+    LOG_INFO("[PSA_UDPMC] pubsub_udpmcAdmin_removeEndpoint");
+    celix_status_t  status = CELIX_SUCCESS;
+    return status;
+}
+
+celix_status_t pubsub_udpmcAdmin_executeCommand(void *handle, char *commandLine __attribute__((unused)), FILE *out, FILE *errStream __attribute__((unused))) {
+    pubsub_udpmc_admin_t *psa = handle;
+    celix_status_t  status = CELIX_SUCCESS;
+
+    fprintf(out, "\nTopic Senders:\n");
+    celixThreadMutex_lock(&psa->topicSenders.mutex);
+    hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_updmc_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
+        const char *psaType = pubsub_udpmcTopicSender_psaType(sender);
+        const char *serType = pubsub_udpmcTopicSender_serializerType(sender);
+        const char *scope = pubsub_udpmcTopicSender_scope(sender);
+        const char *topic = pubsub_udpmcTopicSender_topic(sender);
+        const char *url = pubsub_udpmcTopicSender_socketAddress(sender);
+        fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
+        fprintf(out, "   |- psa type        = %s\n", psaType);
+        fprintf(out, "   |- serializer type = %s\n", serType);
+        fprintf(out, "   |- url             = %s\n", url);
+    }
+    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    fprintf(out, "\n");
+
+    //TODO topic receivers
+
+    return status;
+}
+
+#ifndef ANDROID
+static celix_status_t udpmc_getIpAddress(const char* interface, char** ip) {
+    celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+
+    struct ifaddrs *ifaddr, *ifa;
+    char host[NI_MAXHOST];
+
+    if (getifaddrs(&ifaddr) != -1)
+    {
+        for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa = ifa->ifa_next)
+        {
+            if (ifa->ifa_addr == NULL)
+                continue;
+
+            if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) {
+                if (interface == NULL) {
+                    *ip = strdup(host);
+                    status = CELIX_SUCCESS;
+                }
+                else if (strcmp(ifa->ifa_name, interface) == 0) {
+                    *ip = strdup(host);
+                    status = CELIX_SUCCESS;
+                }
+            }
+        }
+
+        freeifaddrs(ifaddr);
+    }
+
+    return status;
+}
+#endif
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
new file mode 100644
index 0000000..011d272
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
@@ -0,0 +1,92 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef CELIX_PUBSUB_UDPMC_ADMIN_H
+#define CELIX_PUBSUB_UDPMC_ADMIN_H
+
+#include "celix_api.h"
+#include "log_helper.h"
+
+#define PUBSUB_UDPMC_ADMIN_TYPE                     "udpmc"
+#define PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY			"pubsub.udpmc.socket_address"
+
+#define PUBSUB_UDPMC_IP_KEY 	                    "PSA_IP"
+#define PUBSUB_UDPMC_ITF_KEY	                    "PSA_INTERFACE"
+#define PUBSUB_UDPMC_MULTICAST_IP_PREFIX_KEY        "PSA_MC_PREFIX"
+#define PUBSUB_UDPMC_VERBOSE_KEY                    "PSA_UDPMC_VERBOSE"
+
+#define PUBSUB_UDPMC_MULTICAST_IP_PREFIX_DEFAULT    "224.100"
+#define PUBSUB_UDPMC_VERBOSE_DEFAULT                "true"
+
+
+typedef struct pubsub_udpmc_admin {
+    celix_bundle_context_t *ctx;
+    log_helper_t *log;
+    char* ifIpAddress; // The local interface which is used for multicast communication
+    char* mcIpAddress; // The multicast IP address
+    int sendSocket;
+    double qosSampleScore;
+    double qosControlScore;
+    double defaultScore;
+    bool verbose;
+    const char *fwUUID;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = scope:topic key, value = pubsub_udpmc_topic_sender_t
+    } topicSenders;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = scope:topic key, value = pubsub_udpmc_topic_sender_t
+    } topicReceivers;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = endpoint uuid, value = psa_udpmc_connected_endpoint_entry_t
+    } connectedEndpoints;
+
+} pubsub_udpmc_admin_t;
+
+typedef struct psa_udpmc_connected_endpoint {
+    void *sender; //if connected endpoint is subscriber. todo type
+    void *receiver; //if connected endpoint is publisher. TODO type
+    char *endpointUUID;
+} psa_udpmc_connected_endpoint_t;
+
+pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
+void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa);
+
+celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId);
+celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, double *score);
+
+celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
+
+celix_status_t pubsub_udpmcAdmin_setupTopicReciever(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_udpmcAdmin_teardownTopicReciever(void *handle, const char *scope, const char *topic);
+
+celix_status_t pubsub_udpmcAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint);
+celix_status_t pubsub_udpmcAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint);
+
+celix_status_t pubsub_udpmcAdmin_executeCommand(void *handle, char *commandLine, FILE *outStream, FILE *errStream);
+
+#endif //CELIX_PUBSUB_UDPMC_ADMIN_H
+

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
new file mode 100644
index 0000000..5553d3b
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
@@ -0,0 +1,99 @@
+#include <pubsub_serializer.h>
+#include <stdlib.h>
+#include <memory.h>
+#include <pubsub_constants.h>
+#include "pubsub_udpmc_topic_sender.h"
+#include "pubsub_psa_udpmc_constants.h"
+
+static void pubsub_udpmcTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props);
+
+struct pubsub_updmc_topic_sender {
+    celix_bundle_context_t *ctx;
+    char *scope;
+    char *topic;
+    char *socketAddress;
+
+    long serTrackerId;
+    struct {
+        celix_thread_mutex_t mutex;
+        pubsub_serializer_service_t *svc;
+        const celix_properties_t *props;
+    } serializer;
+};
+
+pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, long serializerSvcId) {
+    pubsub_updmc_topic_sender_t *sender = calloc(1, sizeof(*sender));
+    sender->ctx = ctx;
+    sender->scope = strndup(scope, 1024 * 1024);
+    sender->topic = strndup(topic, 1024 * 1024);
+
+    celixThreadMutex_create(&sender->serializer.mutex, NULL);
+
+    char filter[64];
+    snprintf(filter, 64, "(service.id=%li)", serializerSvcId);
+
+    celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+    opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
+    opts.filter.filter = filter;
+    opts.filter.ignoreServiceLanguage = true;
+    opts.callbackHandle = sender;
+    opts.setWithProperties = pubsub_udpmcTopicSender_setSerializer;
+    sender->serTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+
+    return sender;
+}
+
+void pubsub_udpmcTopicSender_destroy(pubsub_updmc_topic_sender_t *sender) {
+    if (sender != NULL) {
+        celix_bundleContext_stopTracker(sender->ctx, sender->serTrackerId);
+
+        celixThreadMutex_destroy(&sender->serializer.mutex);
+
+        free(sender->scope);
+        free(sender->topic);
+        free(sender);
+    }
+}
+
+const char* pubsub_udpmcTopicSender_psaType(pubsub_updmc_topic_sender_t *sender __attribute__((unused))) {
+    return PSA_UDPMC_PUBSUB_ADMIN_TYPE;
+}
+
+const char* pubsub_udpmcTopicSender_serializerType(pubsub_updmc_topic_sender_t *sender) {
+    const char *result = NULL;
+    celixThreadMutex_lock(&sender->serializer.mutex);
+    if (sender->serializer.props != NULL) {
+        result = celix_properties_get(sender->serializer.props, PUBSUB_SERIALIZER_TYPE_KEY, NULL);
+    }
+    celixThreadMutex_unlock(&sender->serializer.mutex);
+    return result;
+}
+
+const char* pubsub_udpmcTopicSender_scope(pubsub_updmc_topic_sender_t *sender) {
+    return sender->scope;
+}
+
+const char* pubsub_udpmcTopicSender_topic(pubsub_updmc_topic_sender_t *sender) {
+    return sender->topic;
+}
+
+const char* pubsub_udpmcTopicSender_socketAddress(pubsub_updmc_topic_sender_t *sender) {
+    return sender->socketAddress;
+}
+
+void pubsub_udpmcTopicSender_connectTo(pubsub_updmc_topic_sender_t *sender, const celix_properties_t *endpoint) {
+    //TODO
+}
+
+void pubsub_udpmcTopicSender_disconnectFrom(pubsub_updmc_topic_sender_t *sender, const celix_properties_t *endpoint) {
+    //TODO
+}
+
+static void pubsub_udpmcTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props) {
+    pubsub_updmc_topic_sender_t *sender = handle;
+    pubsub_serializer_service_t *ser = svc;
+    celixThreadMutex_lock(&sender->serializer.mutex);
+    sender->serializer.svc = ser;
+    sender->serializer.props = props;
+    celixThreadMutex_unlock(&sender->serializer.mutex);
+}
\ No newline at end of file