You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2017/02/06 14:23:04 UTC

[5/6] celix git commit: CELIX-389: Adds Celix Publish Subscribe donation.

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c b/celix-pubsub/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
new file mode 100644
index 0000000..94331d9
--- /dev/null
+++ b/celix-pubsub/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
@@ -0,0 +1,164 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_publisher.c
+ *
+ *  \date       Sep 21, 2010
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include <stdlib.h>
+#include <string.h>
+#include <pthread.h>
+#include <unistd.h>
+
+#include "service_tracker.h"
+#include "celix_threads.h"
+
+#include "pubsub_common.h"
+#include "poi.h"
+
+#include "pubsub_publisher_private.h"
+
+static bool stop=false;
+
+static double randCoordinate(double min, double max){
+
+	double ret = min + (((double)rand()) / (((double)RAND_MAX)/(max-min))) ;
+
+	return ret;
+
+}
+
+static void* send_thread(void* arg){
+
+	send_thread_struct_pt st_struct = (send_thread_struct_pt)arg;
+
+	pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)st_struct->service;
+	pubsub_sender_pt publisher = (pubsub_sender_pt)st_struct->publisher;
+
+	char fwUUID[9];
+	memset(fwUUID,0,9);
+	memcpy(fwUUID,publisher->ident,8);
+
+	//poi_t point = calloc(1,sizeof(*point));
+	location_t place = calloc(1,sizeof(*place));
+
+	char* desc = calloc(64,sizeof(char));
+	snprintf(desc,64,"fw-%s [TID=%lu]", fwUUID, (unsigned long)pthread_self());
+
+	char* name = calloc(64,sizeof(char));
+	snprintf(name,64,"Bundle#%ld",publisher->bundleId);
+
+	place->name = name;
+	place->description = desc;
+	place->extra = "DONT PANIC";
+	printf("TOPIC : %s\n",st_struct->topic);
+	unsigned int msgId = 0;
+	if( publish_svc->localMsgTypeIdForMsgType(publish_svc->handle,st_struct->topic,&msgId) == 0 ){
+
+		while(stop==false){
+			place->position.lat = randCoordinate(MIN_LAT,MAX_LAT);
+			place->position.lon = randCoordinate(MIN_LON,MAX_LON);
+			int nr_char = (int)randCoordinate(5,100000);
+			//int nr_char = 25;
+			place->data = calloc(nr_char, 1);
+			for(int i = 0; i < (nr_char-1); i++) {
+				place->data[i] = i%10 + '0';
+			}
+			if(publish_svc->send) {
+				publish_svc->send(publish_svc->handle,msgId,place);
+			} else {
+				printf("No send for %s\n", st_struct->topic);
+			}
+			printf("Sent %s [%f, %f] (%s, %s) data len = %d\n",st_struct->topic, place->position.lat, place->position.lon,place->name,place->description, nr_char);
+			free(place->data);
+			sleep(2);
+		}
+	}
+	else{
+		printf("PUBLISHER: Cannot retrieve msgId for message '%s'\n",MSG_POI_NAME);
+	}
+
+	free(place->description);
+	free(place->name);
+	free(place);
+
+	free(st_struct);
+
+
+	return NULL;
+
+}
+
+pubsub_sender_pt publisher_create(array_list_pt trackers,const char* ident,long bundleId) {
+	pubsub_sender_pt publisher = malloc(sizeof(*publisher));
+
+	publisher->trackers = trackers;
+	publisher->ident = ident;
+	publisher->bundleId = bundleId;
+	publisher->tid_map = hashMap_create(NULL, NULL, NULL, NULL);
+
+	return publisher;
+}
+
+void publisher_start(pubsub_sender_pt client) {
+	printf("PUBLISHER: starting up...\n");
+}
+
+void publisher_stop(pubsub_sender_pt client) {
+	printf("PUBLISHER: stopping...\n");
+}
+
+void publisher_destroy(pubsub_sender_pt client) {
+	hashMap_destroy(client->tid_map, false, false);
+	client->trackers = NULL;
+	client->ident = NULL;
+	free(client);
+}
+
+celix_status_t publisher_publishSvcAdded(void * handle, service_reference_pt reference, void * service){
+	pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)service;
+	pubsub_sender_pt manager = (pubsub_sender_pt)handle;
+
+	printf("PUBLISHER: new publish service exported (%s).\n",manager->ident);
+
+	send_thread_struct_pt data = calloc(1,sizeof(struct send_thread_struct));
+	const char *value = NULL;
+	serviceReference_getProperty(reference, PUBSUB_PUBLISHER_TOPIC, &value);
+	data->service = publish_svc;
+	data->publisher = manager;
+	data->topic = value;
+	celix_thread_t *tid = malloc(sizeof(*tid));
+	stop=false;
+	celixThread_create(tid,NULL,send_thread,(void*)data);
+	hashMap_put(manager->tid_map, publish_svc, tid);
+	return CELIX_SUCCESS;
+}
+
+celix_status_t publisher_publishSvcRemoved(void * handle, service_reference_pt reference, void * service){
+	pubsub_sender_pt manager = (pubsub_sender_pt)handle;
+	celix_thread_t *tid = hashMap_get(manager->tid_map, service);
+	printf("PUBLISHER: publish service unexporting (%s) %li!\n",manager->ident, tid->thread);
+	stop=true;
+	celixThread_join(*tid,NULL);
+	free(tid);
+	return CELIX_SUCCESS;
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/examples/pubsub/publisher2/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/examples/pubsub/publisher2/CMakeLists.txt b/celix-pubsub/pubsub/examples/pubsub/publisher2/CMakeLists.txt
new file mode 100644
index 0000000..270a881
--- /dev/null
+++ b/celix-pubsub/pubsub/examples/pubsub/publisher2/CMakeLists.txt
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+include_directories("../publisher/private/include")
+include_directories("${PROJECT_SOURCE_DIR}/framework/public/include")
+include_directories("${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include")
+include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub")
+
+add_bundle(org.apache.celix.pubsub_publisher.PoiPublisher2
+    SYMBOLIC_NAME "apache_celix_pubsub_poi_publisher2"
+    VERSION "1.0.0"
+    SOURCES 
+    	../publisher/private/src/ps_pub_activator.c
+    	../publisher/private/src/pubsub_publisher.c
+    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
+)
+
+bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher2
+	${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
+	${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
+    DESTINATION "META-INF/descriptors"
+)
+
+bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher2
+		${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/poi1.properties
+		${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/poi2.properties
+    DESTINATION "META-INF/topics/pub"
+)
+
+bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher2
+		${PROJECT_SOURCE_DIR}/pubsub/examples/keys/publisher
+    DESTINATION "META-INF/keys"
+)
+
+bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher2
+		${PROJECT_SOURCE_DIR}/pubsub/examples/keys/subscriber/public
+    DESTINATION "META-INF/keys/subscriber"
+)
+
+

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/examples/pubsub/subscriber/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/examples/pubsub/subscriber/CMakeLists.txt b/celix-pubsub/pubsub/examples/pubsub/subscriber/CMakeLists.txt
new file mode 100644
index 0000000..047dbb7
--- /dev/null
+++ b/celix-pubsub/pubsub/examples/pubsub/subscriber/CMakeLists.txt
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+include_directories("private/include")
+include_directories("${PROJECT_SOURCE_DIR}/framework/public/include")
+include_directories("${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include")
+include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub")
+include_directories("${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/common/include")
+
+add_bundle(org.apache.celix.pubsub_subscriber.PoiSubscriber
+    SYMBOLIC_NAME "apache_celix_pubsub_poi_subscriber" 
+    VERSION "1.0.0"
+    SOURCES 
+    	private/src/ps_sub_activator.c
+    	private/src/pubsub_subscriber.c
+    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
+)
+
+bundle_files(org.apache.celix.pubsub_subscriber.PoiSubscriber
+	    ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
+	    ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
+    DESTINATION "META-INF/descriptors"
+)
+
+bundle_files(org.apache.celix.pubsub_subscriber.PoiSubscriber
+		${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/poi1.properties
+		${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/poi2.properties
+    DESTINATION "META-INF/topics/sub"
+)
+
+bundle_files(org.apache.celix.pubsub_subscriber.PoiSubscriber
+		${PROJECT_SOURCE_DIR}/pubsub/examples/keys/subscriber
+    DESTINATION "META-INF/keys"
+)
+
+bundle_files(org.apache.celix.pubsub_subscriber.PoiSubscriber
+		${PROJECT_SOURCE_DIR}/pubsub/examples/keys/publisher/public
+    DESTINATION "META-INF/keys/publisher"
+)

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h b/celix-pubsub/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
new file mode 100644
index 0000000..c6072df
--- /dev/null
+++ b/celix-pubsub/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
@@ -0,0 +1,52 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_subscriber_private.h
+ *
+ *  \date       Sep 21, 2010
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_SUBSCRIBER_PRIVATE_H_
+#define PUBSUB_SUBSCRIBER_PRIVATE_H_
+
+
+#include <string.h>
+
+#include "celixbool.h"
+
+#include "pubsub_common.h"
+#include "subscriber.h"
+
+struct pubsub_receiver {
+	char * name;
+};
+
+typedef struct pubsub_receiver* pubsub_receiver_pt;
+
+pubsub_receiver_pt subscriber_create(char* topics);
+void subscriber_start(pubsub_receiver_pt client);
+void subscriber_stop(pubsub_receiver_pt client);
+void subscriber_destroy(pubsub_receiver_pt client);
+
+int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg,pubsub_multipart_callbacks_t *callbacks, bool* release);
+
+
+#endif /* PUBSUB_SUBSCRIBER_PRIVATE_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/examples/pubsub/subscriber/private/src/ps_sub_activator.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/examples/pubsub/subscriber/private/src/ps_sub_activator.c b/celix-pubsub/pubsub/examples/pubsub/subscriber/private/src/ps_sub_activator.c
new file mode 100644
index 0000000..efd34c9
--- /dev/null
+++ b/celix-pubsub/pubsub/examples/pubsub/subscriber/private/src/ps_sub_activator.c
@@ -0,0 +1,123 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * ps_sub_activator.c
+ *
+ *  \date       Sep 21, 2010
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include <stdlib.h>
+
+#include "bundle_activator.h"
+
+#include "pubsub_common.h"
+#include "pubsub_utils.h"
+#include "pubsub_subscriber_private.h"
+
+#define SUB_TOPIC "poi1;poi2"
+
+struct subscriberActivator {
+	array_list_pt registrationList; //List<service_registration_pt>
+	pubsub_subscriber_pt subsvc;
+};
+
+celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
+	struct subscriberActivator * act = calloc(1,sizeof(struct subscriberActivator));
+	*userData = act;
+	arrayList_create(&(act->registrationList));
+	return CELIX_SUCCESS;
+}
+
+celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
+	struct subscriberActivator * act = (struct subscriberActivator *) userData;
+
+
+	pubsub_subscriber_pt subsvc = calloc(1,sizeof(*subsvc));
+	pubsub_receiver_pt sub = subscriber_create(SUB_TOPIC);
+	subsvc->handle = sub;
+	subsvc->receive = pubsub_subscriber_recv;
+
+	act->subsvc = subsvc;
+
+	array_list_pt topic_list = pubsub_getTopicsFromString(SUB_TOPIC);
+
+	if(topic_list !=NULL){
+
+		int i;
+		for(i=0; i<arrayList_size(topic_list);i++){
+			char* topic = arrayList_get(topic_list,i);
+			if(strlen(topic)<MAX_TOPIC_LEN){
+				properties_pt props = properties_create();
+				properties_set(props, PUBSUB_SUBSCRIBER_TOPIC,topic);
+#ifdef USE_SCOPE
+				char *scope;
+				asprintf(&scope, "my_scope_%d", i);
+				properties_set(props,SUBSCRIBER_SCOPE,scope);
+				free(scope);
+#endif
+				service_registration_pt reg = NULL;
+				bundleContext_registerService(context, PUBSUB_SUBSCRIBER_SERVICE_NAME, subsvc, props, &reg);
+				arrayList_add(act->registrationList,reg);
+			}
+			else{
+				printf("Topic %s too long. Skipping subscription.\n",topic);
+			}
+			free(topic);
+		}
+		arrayList_destroy(topic_list);
+
+	}
+
+	subscriber_start((pubsub_receiver_pt)act->subsvc->handle);
+
+	return CELIX_SUCCESS;
+}
+
+celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
+	struct subscriberActivator * act = (struct subscriberActivator *) userData;
+
+	int i;
+	for(i=0; i<arrayList_size(act->registrationList);i++){
+		service_registration_pt reg = arrayList_get(act->registrationList,i);
+		serviceRegistration_unregister(reg);
+
+	}
+
+	subscriber_stop((pubsub_receiver_pt)act->subsvc->handle);
+
+	return CELIX_SUCCESS;
+}
+
+celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) {
+
+	struct subscriberActivator * act = (struct subscriberActivator *) userData;
+
+	act->subsvc->receive = NULL;
+	subscriber_destroy((pubsub_receiver_pt)act->subsvc->handle);
+	act->subsvc->handle = NULL;
+	free(act->subsvc);
+	act->subsvc = NULL;
+
+	arrayList_destroy(act->registrationList);
+	free(act);
+
+	return CELIX_SUCCESS;
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c b/celix-pubsub/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
new file mode 100644
index 0000000..a137253
--- /dev/null
+++ b/celix-pubsub/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
@@ -0,0 +1,64 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_subscriber.c
+ *
+ *  \date       Sep 21, 2010
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+
+#include "poi.h"
+#include "pubsub_subscriber_private.h"
+
+pubsub_receiver_pt subscriber_create(char* topics) {
+	pubsub_receiver_pt sub = calloc(1,sizeof(*sub));
+	sub->name = strdup(topics);
+	return sub;
+}
+
+
+void subscriber_start(pubsub_receiver_pt subscriber){
+	printf("Subscriber started...\n");
+}
+
+void subscriber_stop(pubsub_receiver_pt subscriber){
+	printf("Subscriber stopped...\n");
+}
+
+void subscriber_destroy(pubsub_receiver_pt subscriber){
+	if(subscriber->name!=NULL){
+		free(subscriber->name);
+	}
+	subscriber->name=NULL;
+	free(subscriber);
+}
+
+int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg,pubsub_multipart_callbacks_t *callbacks, bool* release){
+
+	location_t place = (location_t)msg;
+	int nrchars = 25;
+	printf("Recv (%s): [%f, %f] (%s, %s) data_len = %ld data =%*.*s\n",msgType, place->position.lat, place->position.lon,place->name,place->description, strlen(place->data) + 1, nrchars, nrchars, place->data);
+
+	return 0;
+
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/keygen/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/keygen/CMakeLists.txt b/celix-pubsub/pubsub/keygen/CMakeLists.txt
new file mode 100644
index 0000000..4cb4f5a
--- /dev/null
+++ b/celix-pubsub/pubsub/keygen/CMakeLists.txt
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if (ENABLE_ZMQ_SECURITY)
+
+	find_package(ZMQ REQUIRED)
+	find_package(CZMQ REQUIRED)
+	find_package(OpenSSL 1.1.0 REQUIRED)
+	
+	include_directories("${ZMQ_INCLUDE_DIR}")
+	include_directories("${CZMQ_INCLUDE_DIR}")
+	include_directories("${OPENSSL_INCLUDE_DIR}")
+	
+	add_executable(makecert makecert.c)
+	target_link_libraries(makecert ${CZMQ_LIBRARIES})
+	
+	add_executable(ed_file ed_file.c)
+	target_link_libraries(ed_file ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY})
+
+endif()

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/keygen/ed_file.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/keygen/ed_file.c b/celix-pubsub/pubsub/keygen/ed_file.c
new file mode 100644
index 0000000..a0fc7e2
--- /dev/null
+++ b/celix-pubsub/pubsub/keygen/ed_file.c
@@ -0,0 +1,309 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * ed_file.c
+ *
+ *  \date       Dec 2, 2016
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include <czmq.h>
+#include <openssl/evp.h>
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <strings.h>
+
+#define MAX_KEY_FILE_LENGTH 256
+#define MAX_LINE_LENGTH 64
+#define AES_KEY_LENGTH 32
+#define AES_IV_LENGTH 16
+
+#define KEY_TO_GET "aes_key"
+#define IV_TO_GET "aes_iv"
+
+int generate_sha256_hash(char* text, unsigned char* digest);
+int encrypt_aes(unsigned char *plaintext, int plaintext_len, unsigned char *key, unsigned char *iv, unsigned char *ciphertext);
+int decrypt_aes(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext);
+
+static char* read_keys_file_content(const char *filePath);
+static void parse_key_lines(char *keysBuffer, char **key, char **iv);
+static void parse_key_line(char *line, char **key, char **iv);
+
+int main(int argc, const char* argv[])
+{
+	if (argc < 4){
+		printf("Usage: %s <key_file> <input_file> <output_file> [options]\n", argv[0]);
+		printf("Default behavior: encrypting a file\n");
+		printf("Options:\n");
+		printf("\t-d\tSpecify to decrypt a file\n");
+		printf("\n");
+		return EXIT_FAILURE;
+	}
+
+	int rc = 0;
+
+	const char* keys_file_path = argv[1];
+	const char* input_file_path = argv[2];
+	const char* output_file_path = argv[3];
+
+	bool decryptParam = false;
+	if (argc > 4 && strcmp(argv[4], "-d") == 0){
+		decryptParam = true;
+	}
+
+	if (!zsys_file_exists(keys_file_path)){
+		printf("Keys file '%s' doesn't exist!\n", keys_file_path);
+		return EXIT_FAILURE;
+	}
+
+	if (!zsys_file_exists(input_file_path)){
+		printf("Input file does not exist!\n");
+		return EXIT_FAILURE;
+	}
+
+	char* keys_data = read_keys_file_content(keys_file_path);
+	if (keys_data == NULL){
+		return EXIT_FAILURE;
+	}
+
+	char* key = NULL;
+	char* iv = NULL;
+	parse_key_lines(keys_data, &key, &iv);
+	free(keys_data);
+
+	if (key == NULL || iv == NULL){
+		printf("Loading AES key and/or AES iv failed!\n");
+		free(key);
+		free(iv);
+		return EXIT_FAILURE;
+	}
+
+	printf("Using AES Key \t\t'%s'\n", key);
+	printf("Using AES IV \t\t'%s'\n", iv);
+	printf("Input file path \t'%s'\n", input_file_path);
+	printf("Output file path \t'%s'\n", output_file_path);
+	printf("Decrypting \t\t'%s'\n\n", (decryptParam) ? "true" : "false");
+
+	unsigned char key_digest[EVP_MAX_MD_SIZE];
+	unsigned char iv_digest[EVP_MAX_MD_SIZE];
+	generate_sha256_hash((char*) key, key_digest);
+	generate_sha256_hash((char*) iv, iv_digest);
+
+	zchunk_t* input_chunk = zchunk_slurp (input_file_path, 0);
+	if (input_chunk == NULL){
+		printf("Input file not correct!\n");
+		free(key);
+		free(iv);
+		return EXIT_FAILURE;
+	}
+
+	//Load input data from file
+	int input_file_size = (int) zchunk_size (input_chunk);
+	char* input_file_data = zchunk_strdup(input_chunk);
+	zchunk_destroy (&input_chunk);
+
+	int output_len;
+	unsigned char output[input_file_size];
+	if (decryptParam){
+		output_len = decrypt_aes((unsigned char*) input_file_data, input_file_size, key_digest, iv_digest, output);
+		output[output_len] = '\0';
+	}else{
+		output_len = encrypt_aes((unsigned char*) input_file_data, input_file_size, key_digest, iv_digest, output);
+	}
+
+	//Write output data to file
+	zfile_t* output_file = zfile_new (".", output_file_path);
+	zchunk_t* output_chunk = zchunk_new(output, output_len);
+	rc = zfile_output (output_file);
+	if (rc != 0){
+		printf("Problem with opening file for writing!\n");
+		zchunk_destroy (&output_chunk);
+		zfile_close (output_file);
+		zfile_destroy (&output_file);
+		free(input_file_data);
+		free(key);
+		free(iv);
+
+		return EXIT_FAILURE;
+	}
+
+	rc = zfile_write (output_file, output_chunk, 0);
+	if (rc != 0){
+		printf("Problem with writing output to file!\n");
+	}
+	printf("Output written to file:\n");
+	if (decryptParam){
+		printf("%s\n", output);
+	}else{
+		BIO_dump_fp (stdout, (const char *) output, output_len);
+	}
+
+	zchunk_destroy (&output_chunk);
+	zfile_close (output_file);
+	zfile_destroy (&output_file);
+	free(input_file_data);
+	free(key);
+	free(iv);
+
+	return EXIT_SUCCESS;
+}
+
+int generate_sha256_hash(char* text, unsigned char* digest)
+{
+	unsigned int digest_len;
+
+	EVP_MD_CTX * mdctx = EVP_MD_CTX_new();
+	EVP_DigestInit_ex(mdctx, EVP_sha256(), NULL);
+	EVP_DigestUpdate(mdctx, text, strlen(text));
+	EVP_DigestFinal_ex(mdctx, digest, &digest_len);
+	EVP_MD_CTX_free(mdctx);
+
+	return digest_len;
+}
+
+int encrypt_aes(unsigned char *plaintext, int plaintext_len, unsigned char *key, unsigned char *iv, unsigned char *ciphertext)
+{
+	int len;
+	int ciphertext_len;
+
+	EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new();
+
+	EVP_EncryptInit_ex(ctx, EVP_aes_256_cbc(), NULL, key, iv);
+	EVP_EncryptUpdate(ctx, ciphertext, &len, plaintext, plaintext_len);
+	ciphertext_len = len;
+	EVP_EncryptFinal_ex(ctx, ciphertext + len, &len);
+	ciphertext_len += len;
+
+	EVP_CIPHER_CTX_free(ctx);
+
+	return ciphertext_len;
+}
+
+int decrypt_aes(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext)
+{
+	int len;
+	int plaintext_len;
+
+	EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new();
+
+	EVP_DecryptInit_ex(ctx, EVP_aes_256_cbc(), NULL, key, iv);
+	EVP_DecryptUpdate(ctx, plaintext, &len, ciphertext, ciphertext_len);
+	plaintext_len = len;
+	EVP_DecryptFinal_ex(ctx, plaintext + len, &len);
+	plaintext_len += len;
+
+	EVP_CIPHER_CTX_free(ctx);
+
+	return plaintext_len;
+}
+
+static char* read_keys_file_content(const char *keys_file_path){
+	char* keys_file_full_path = strndup(keys_file_path, MAX_KEY_FILE_LENGTH);
+	char* keys_file_name = NULL;
+
+	char* sep_kf_at = strrchr(keys_file_path, '/');
+	if (sep_kf_at != NULL){
+		*sep_kf_at = '\0';
+		keys_file_name = sep_kf_at + 1;
+	}else{
+		keys_file_name = (char*) keys_file_path;
+		keys_file_path = (const char*) ".";
+	}
+
+	printf("Keys file path: %s\n", keys_file_full_path);
+
+	int rc = 0;
+
+	zfile_t* keys_file = zfile_new (keys_file_path, keys_file_name);
+	rc = zfile_input (keys_file);
+	if (rc != 0){
+		printf("Keys file '%s' not readable!\n", keys_file_full_path);
+		zfile_destroy(&keys_file);
+		free(keys_file_full_path);
+		return NULL;
+	}
+
+	ssize_t keys_file_size = zsys_file_size (keys_file_full_path);
+	zchunk_t* keys_chunk = zfile_read (keys_file, keys_file_size, 0);
+	if (keys_chunk == NULL){
+		printf("Can't read file '%s'!\n", keys_file_full_path);
+		zfile_close(keys_file);
+		zfile_destroy(&keys_file);
+		free(keys_file_full_path);
+		return NULL;
+	}
+
+	char* keys_data = zchunk_strdup(keys_chunk);
+	zchunk_destroy(&keys_chunk);
+	zfile_close(keys_file);
+	zfile_destroy (&keys_file);
+
+	return keys_data;
+}
+
+static void parse_key_lines(char *keysBuffer, char **key, char **iv){
+	char *line = NULL, *saveLinePointer = NULL;
+
+	bool firstTime = true;
+	do {
+		if (firstTime){
+			line = strtok_r(keysBuffer, "\n", &saveLinePointer);
+			firstTime = false;
+		}else {
+			line = strtok_r(NULL, "\n", &saveLinePointer);
+		}
+
+		if (line == NULL){
+			break;
+		}
+
+		parse_key_line(line, key, iv);
+
+	} while((*key == NULL || *iv == NULL) && line != NULL);
+
+}
+
+static void parse_key_line(char *line, char **key, char **iv){
+	char *detectedKey = NULL, *detectedValue= NULL;
+
+	char* sep_at = strchr(line, ':');
+	if (sep_at == NULL){
+		return;
+	}
+
+	*sep_at = '\0'; // overwrite first separator, creating two strings.
+	detectedKey = line;
+	detectedValue = sep_at + 1;
+
+	if (detectedKey == NULL || detectedValue == NULL){
+		return;
+	}
+	if (detectedKey[0] == '\0' || detectedValue[0] == '\0'){
+		return;
+	}
+
+	if (*key == NULL && strcmp(detectedKey, KEY_TO_GET) == 0){
+		*key = strndup(detectedValue, AES_KEY_LENGTH);
+	} else if (*iv == NULL && strcmp(detectedKey, IV_TO_GET) == 0){
+		*iv = strndup(detectedValue, AES_IV_LENGTH);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/keygen/makecert.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/keygen/makecert.c b/celix-pubsub/pubsub/keygen/makecert.c
new file mode 100644
index 0000000..166111e
--- /dev/null
+++ b/celix-pubsub/pubsub/keygen/makecert.c
@@ -0,0 +1,55 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * makecert.c
+ *
+ *  \date       Dec 2, 2016
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include <string.h>
+
+#include "czmq.h"
+
+int main (int argc, const char * argv[])
+{
+
+	const char * cert_name_public = "certificate.pub";
+	const char * cert_name_secret = "certificate.key";
+	if (argc == 3 && strcmp(argv[1], argv[2]) != 0){
+		cert_name_public = argv[1];
+		cert_name_secret = argv[2];
+	}
+
+	zcert_t * cert = zcert_new();
+
+	char *timestr = zclock_timestr ();
+	zcert_set_meta (cert, "date-created", timestr);
+	free (timestr);
+
+	zcert_save_public(cert, cert_name_public);
+	zcert_save_secret(cert, cert_name_secret);
+	zcert_print (cert);
+	printf("\n");
+	printf("I: CURVE certificate created in %s and %s\n", cert_name_public, cert_name_secret);
+	zcert_destroy (&cert);
+
+	return 0;
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/CMakeLists.txt b/celix-pubsub/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
new file mode 100644
index 0000000..7c8f620
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+find_package(Jansson REQUIRED)
+
+include_directories("${PROJECT_SOURCE_DIR}/utils/public/include")
+include_directories("${PROJECT_SOURCE_DIR}/log_service/public/include")
+include_directories("${PROJECT_SOURCE_DIR}/dfi/public/include")
+include_directories("${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include")
+include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub")
+include_directories("private/include")
+include_directories("public/include")
+include_directories("${JANSSON_INCLUDE_DIR}")
+if (SERIALIZER_PATH)
+	include_directories("${SERIALIZER_PATH}/include")
+endif()
+if (SERIALIZER_LIB_INCLUDE_DIR)
+	include_directories("${SERIALIZER_LIB_INCLUDE_DIR}")
+endif()
+if (SERIALIZER_LIB_DIR)
+	link_directories("${SERIALIZER_LIB_DIR}")
+endif()
+
+add_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc
+    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_udp_multicast"
+    VERSION "1.0.0"
+    SOURCES
+    	private/src/psa_activator.c
+    	private/src/pubsub_admin_impl.c
+    	private/src/topic_subscription.c
+    	private/src/topic_publication.c
+    	private/src/large_udp.c
+    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/dyn_msg_utils.c
+    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
+    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
+    	${PUBSUB_SERIALIZER_SRC}
+)
+
+set_target_properties(org.apache.celix.pubsub_admin.PubSubAdminUdpMc PROPERTIES INSTALL_RPATH "$ORIGIN")
+target_link_libraries(org.apache.celix.pubsub_admin.PubSubAdminUdpMc ${CELIX_LIBRARIES} ${JANSSON_LIBRARIES} ${SERIALIZER_LIBRARY})
+
+install_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc)
+

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_admin_udp_mc/README.md
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/README.md b/celix-pubsub/pubsub/pubsub_admin_udp_mc/README.md
new file mode 100644
index 0000000..19c7b86
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_admin_udp_mc/README.md
@@ -0,0 +1,62 @@
+#PUBSUB-Admin UDP Multicast
+
+---
+
+##Description
+
+This description is particular for the UDP-Multicast PUB-SUB. 
+
+The UDP multicast pubsub admin is used to transfer user data transparent via UDP multicast. UDP packets can contain approximately  
+64kB . To overcome this limit the admin has a protocol on top of UDP which fragments the data to be send and these  
+fragments are reassembled at the reception side.
+
+### IP Addresses
+
+To use UDP-multicast 2 IP adresses are needed:
+
+1. IP address which is bound to an (ethernet) interface
+2. The multicast address (in the range 224.X.X.X - 239.X.X.X)
+
+When the PubSubAdmin starts it determines the bound IP address. This is done in the order:
+
+1. The first IP number bound to the interface which is set by the "PSA_INTERFACE" property
+2. The interfaces are iterated and the first IP number found is used. (typically this is 127.0.0.1 (localhost)
+
+The  Multicass IP address is determined in the order:
+
+1. If the `PSA_IP` property is defined, this IP will be used as multicast.
+2. If the `PSA_MC_PREFIX` property, is defined, this property is used as the first 2 numbers of the multicast address extended with the last 2 numbers of the bound IP.
+3. If the `PSA_MC_PREFIX` property is not defined `224.100` is used.
+
+### Discovery
+
+When a publisher request for a topic a TopicSender is created by a ServiceFactory. This TopicSender uses the multicast address as described above with a random chosen portnumber. The combination of the multicast-IP address with the portnumber and protocol(udp) is the endpoint.  
+This endpoint is published by the PubSubDiscovery within its topic in ETCD (i.e. udp://224.100.10.20:40123).
+ 
+A subscriber, interested in the topic, is informed by the the ToplogyManager that there is a new endpoint. The TopicReceiver at the subscriber side creates a listening socket based on this endpoint. 
+
+Now a data-connection is created and data send by the publisher will be received by the subscriber.  
+
+---
+
+##Properties
+
+<table border="1">
+    <tr><th>Property</th><th>Description</th></tr>
+    <tr><td>PSA_INTERFACE</td><td>Interface which has to be used for multicast communication</td></tr>
+    <tr><td>PSA_IP</td><td>Multicast IP address used by the bundle</td></tr>
+    <tr><td>PSA_MC_PREFIX</td><td>First 2 digits of the MC IP address </td></tr>
+</table>
+
+---
+
+##Shortcomings
+
+1. Per topic a random portnr is used for creating an endpoint. It is theoretical possible that for 2 topic the same endpoint is created.
+2. For every message a 32 bit random message ID is generated to discriminate segments of different messages which could be sent at the same time. It is theoretically possible that there are 2 equal message ID's at the same time. But since the mesage ID is valid only during the transmission of a message (maximum some milliseconds with large messages) this is not very plausible.
+3. When sending large messages, these messages are segmented and sent after each other. This could cause UDP-buffer overflows in the kernel. A solution could be to add a delay between sending of the segements but this will introduce extra latency.
+4. A Hash is created, using the message definition, to identify the message type. When 2 messages generate the same hash something will terribly go wrong. A check should be added to prevent this (or another way to identify the message type). This problem is also valid for the other admins.
+
+
+
+

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/large_udp.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/large_udp.h b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/large_udp.h
new file mode 100644
index 0000000..a21e654
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/large_udp.h
@@ -0,0 +1,45 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * large_udp.h
+ *
+ *  \date       Mar 1, 2016
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef _LARGE_UDP_H_
+#define _LARGE_UDP_H_
+#include <stdbool.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+
+typedef struct largeUdp  *largeUdp_pt;
+
+largeUdp_pt largeUdp_create(unsigned int maxNrUdpReceptions);
+void largeUdp_destroy(largeUdp_pt handle);
+
+int largeUdp_sendto(largeUdp_pt handle, int fd, void *buf, size_t count, int flags, struct sockaddr_in *dest_addr, size_t addrlen);
+int largeUdp_sendmsg(largeUdp_pt handle, int fd, struct iovec *largeMsg_iovec, int len, int flags, struct sockaddr_in *dest_addr, size_t addrlen);
+bool largeUdp_dataAvailable(largeUdp_pt handle, int fd, unsigned int *index, unsigned int *size);
+int largeUdp_read(largeUdp_pt handle, unsigned int index, void ** buffer, unsigned int size);
+
+#endif /* _LARGE_UDP_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
new file mode 100644
index 0000000..19a56a8
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
@@ -0,0 +1,71 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_admin_impl.h
+ *
+ *  \date       Dec 5, 2013
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_ADMIN_IMPL_H_
+#define PUBSUB_ADMIN_IMPL_H_
+
+#include "pubsub_admin.h"
+#include "log_helper.h"
+
+struct pubsub_admin {
+
+	bundle_context_pt bundle_context;
+	log_helper_pt loghelper;
+
+	celix_thread_mutex_t localPublicationsLock;
+	hash_map_pt localPublications;//<topic(string),service_factory_pt>
+
+	celix_thread_mutex_t externalPublicationsLock;
+	hash_map_pt externalPublications;//<topic(string),List<pubsub_ep>>
+
+	celix_thread_mutex_t subscriptionsLock;
+	hash_map_pt subscriptions; //<topic(string),topic_subscription>
+
+	celix_thread_mutex_t pendingSubscriptionsLock;
+	hash_map_pt pendingSubscriptions; //<topic(string),List<pubsub_ep>>
+
+	char* ifIpAddress; // The local interface which is used for multicast communication
+    char* mcIpAddress; // The multicast IP address
+
+	int sendSocket;
+    void* zmq_context; // to be removed
+
+};
+
+celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin);
+celix_status_t pubsubAdmin_stop(pubsub_admin_pt admin);
+celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin);
+
+celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+
+celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP);
+celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP);
+
+celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char* scope, char* topic);
+celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope, char* topic);
+
+#endif /* PUBSUB_ADMIN_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
new file mode 100644
index 0000000..57c7963
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
@@ -0,0 +1,55 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_publish_service_private.h
+ *
+ *  \date       Sep 24, 2015
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_PUBLISH_SERVICE_PRIVATE_H_
+#define PUBSUB_PUBLISH_SERVICE_PRIVATE_H_
+
+#include "publisher.h"
+#include "pubsub_endpoint.h"
+#include "pubsub_common.h"
+
+#define UDP_BASE_PORT	49152
+#define UDP_MAX_PORT	65000
+
+typedef struct pubsub_udp_msg {
+    struct pubsub_msg_header header;
+    unsigned int payloadSize;
+    char payload[];
+} *pubsub_udp_msg_pt;
+
+typedef struct topic_publication *topic_publication_pt;
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP,char* bindIP, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
+
+celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
+celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
+
+celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
+celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);
+
+array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub);
+
+#endif /* PUBSUB_PUBLISH_SERVICE_PRIVATE_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
new file mode 100644
index 0000000..4ec705b
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
@@ -0,0 +1,55 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * topic_subscription.h
+ *
+ *  \date       Sep 22, 2015
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef TOPIC_SUBSCRIPTION_H_
+#define TOPIC_SUBSCRIPTION_H_
+
+#include "celix_threads.h"
+#include "array_list.h"
+#include "celixbool.h"
+#include "service_tracker.h"
+
+#include "pubsub_endpoint.h"
+#include "pubsub_common.h"
+
+typedef struct topic_subscription* topic_subscription_pt;
+
+celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, char* scope, char* topic,topic_subscription_pt* out);
+celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts);
+celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts);
+celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts);
+
+celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL);
+celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL);
+
+celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
+celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
+
+celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription);
+celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt subscription);
+unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt subscription);
+
+#endif /*TOPIC_SUBSCRIPTION_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/large_udp.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/large_udp.c b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/large_udp.c
new file mode 100644
index 0000000..e5cd5b5
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/large_udp.c
@@ -0,0 +1,362 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * large_udp.c
+ *
+ *  \date       Mar 1, 2016
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include "large_udp.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <array_list.h>
+#include <pthread.h>
+
+#define MAX_UDP_MSG_SIZE 65535   /* 2^16 -1 */
+#define IP_HEADER_SIZE  20
+#define UDP_HEADER_SIZE 8
+//#define MTU_SIZE    1500
+#define MTU_SIZE    8000
+#define MAX_MSG_VECTOR_LEN 64
+
+//#define NO_IP_FRAGMENTATION
+
+struct largeUdp {
+    unsigned int maxNrLists;
+    array_list_pt udpPartLists;
+    pthread_mutex_t dbLock;
+};
+
+typedef struct udpPartList {
+    unsigned int msg_ident;
+    unsigned int msg_size;
+    unsigned int nrPartsRemaining;
+    char *data;
+} *udpPartList_pt;
+
+
+typedef struct msg_part_header {
+    unsigned int msg_ident;
+    unsigned int total_msg_size;
+    unsigned int part_msg_size;
+    unsigned int offset;
+} msg_part_header_t;
+
+#ifdef NO_IP_FRAGMENTATION
+    #define MAX_PART_SIZE   (MTU_SIZE - (IP_HEADER_SIZE + UDP_HEADER_SIZE + sizeof(struct msg_part_header) ))
+#else
+    #define MAX_PART_SIZE   (MAX_UDP_MSG_SIZE - (IP_HEADER_SIZE + UDP_HEADER_SIZE + sizeof(struct msg_part_header) ))
+#endif
+
+typedef struct msg_part {
+    msg_part_header_t header;
+    char data[MAX_PART_SIZE];
+} msg_part_t;
+
+//
+// Create a handle
+//
+largeUdp_pt largeUdp_create(unsigned int maxNrUdpReceptions)
+{
+    printf("## Creating large UDP\n");
+    largeUdp_pt handle = calloc(sizeof(*handle), 1);
+    if(handle != NULL) {
+        handle->maxNrLists = maxNrUdpReceptions;
+        if(arrayList_create(&handle->udpPartLists) != CELIX_SUCCESS) {
+            free(handle);
+            handle = NULL;
+        }
+        pthread_mutex_init(&handle->dbLock, 0);
+    }
+
+    return handle;
+}
+
+//
+// Destroys the handle
+//
+void largeUdp_destroy(largeUdp_pt handle)
+{
+    printf("### Destroying large UDP\n");
+    if(handle != NULL) {
+        pthread_mutex_lock(&handle->dbLock);
+        int nrUdpLists = arrayList_size(handle->udpPartLists);
+        int i;
+        for(i=0; i < nrUdpLists; i++) {
+            udpPartList_pt udpPartList = arrayList_remove(handle->udpPartLists, i);
+            if(udpPartList) {
+                if(udpPartList->data) {
+                    free(udpPartList->data);
+                    udpPartList->data = NULL;
+                }
+                free(udpPartList);
+            }
+        }
+        arrayList_destroy(handle->udpPartLists);
+        handle->udpPartLists = NULL;
+        pthread_mutex_unlock(&handle->dbLock);
+        pthread_mutex_destroy(&handle->dbLock);
+        free(handle);
+    }
+}
+
+//
+// Write large data to UDP. This function splits the data in chunks and sends these chunks with a header over UDP.
+//
+int largeUdp_sendmsg(largeUdp_pt handle, int fd, struct iovec *largeMsg_iovec, int len, int flags, struct sockaddr_in *dest_addr, size_t addrlen)
+{
+    int n;
+    int result = 0;
+    msg_part_header_t header;
+
+    int written = 0;
+    header.msg_ident = rand();
+    header.total_msg_size = 0;
+    for(n = 0; n < len ;n++) {
+    	header.total_msg_size += largeMsg_iovec[n].iov_len;
+    }
+    int nr_buffers = (header.total_msg_size / MAX_PART_SIZE) + 1;
+
+    struct iovec msg_iovec[MAX_MSG_VECTOR_LEN];
+    struct msghdr msg;
+    msg.msg_name = dest_addr;
+    msg.msg_namelen = addrlen;
+    msg.msg_flags = 0;
+    msg.msg_iov = msg_iovec;
+    msg.msg_iovlen = 2; // header and payload;
+    msg.msg_control = NULL;
+    msg.msg_controllen = 0;
+
+    msg.msg_iov[0].iov_base = &header;
+    msg.msg_iov[0].iov_len = sizeof(header);
+
+    for(n = 0; n < nr_buffers; n++) {
+
+        header.part_msg_size = (((header.total_msg_size - n * MAX_PART_SIZE) >  MAX_PART_SIZE) ?  MAX_PART_SIZE  : (header.total_msg_size - n * MAX_PART_SIZE));
+        header.offset = n * MAX_PART_SIZE;
+        int remainingOffset = header.offset;
+        int recvPart = 0;
+        // find the start of the part
+        while(remainingOffset > largeMsg_iovec[recvPart].iov_len) {
+        	remainingOffset -= largeMsg_iovec[recvPart].iov_len;
+        	recvPart++;
+        }
+        int remainingData = header.part_msg_size;
+        int sendPart = 1;
+        msg.msg_iovlen = 1;
+
+        // fill in the output iovec from the input iovec in such a way that all UDP frames are filled maximal.
+        while(remainingData > 0) {
+        	int partLen = ( (largeMsg_iovec[recvPart].iov_len - remainingOffset) <= remainingData ? (largeMsg_iovec[recvPart].iov_len -remainingOffset) : remainingData);
+        	msg.msg_iov[sendPart].iov_base = largeMsg_iovec[recvPart].iov_base + remainingOffset;
+        	msg.msg_iov[sendPart].iov_len = partLen;
+        	remainingData -= partLen;
+        	remainingOffset = 0;
+        	sendPart++;
+        	recvPart++;
+        	msg.msg_iovlen++;
+        }
+        int tmp, tmptot;
+        for(tmp = 0, tmptot=0; tmp < msg.msg_iovlen; tmp++) {
+        	tmptot += msg.msg_iov[tmp].iov_len;
+        }
+
+        int w = sendmsg(fd, &msg, 0);
+        if(w == -1) {
+            perror("send()");
+            result =  -1;
+            break;
+        }
+        written += w;
+    }
+
+    return (result == 0 ? written : result);
+}
+
+//
+// Write large data to UDP. This function splits the data in chunks and sends these chunks with a header over UDP.
+//
+int largeUdp_sendto(largeUdp_pt handle, int fd, void *buf, size_t count, int flags, struct sockaddr_in *dest_addr, size_t addrlen)
+{
+    int n;
+    int nr_buffers = (count / MAX_PART_SIZE) + 1;
+    int result = 0;
+    msg_part_header_t header;
+
+    int written = 0;
+    header.msg_ident = rand();
+    header.total_msg_size = count;
+    char *databuf = buf;
+
+    struct iovec msg_iovec[2];
+    struct msghdr msg;
+    msg.msg_name = dest_addr;
+    msg.msg_namelen = addrlen;
+    msg.msg_flags = 0;
+    msg.msg_iov = msg_iovec;
+    msg.msg_iovlen = 2; // header and payload;
+    msg.msg_control = NULL;
+    msg.msg_controllen = 0;
+
+    msg.msg_iov[0].iov_base = &header;
+    msg.msg_iov[0].iov_len = sizeof(header);
+
+    for(n = 0; n < nr_buffers; n++) {
+
+        header.part_msg_size = (((header.total_msg_size - n * MAX_PART_SIZE) >  MAX_PART_SIZE) ?  MAX_PART_SIZE  : (header.total_msg_size - n * MAX_PART_SIZE));
+        header.offset = n * MAX_PART_SIZE;
+        msg.msg_iov[1].iov_base = &databuf[header.offset];
+        msg.msg_iov[1].iov_len = header.part_msg_size;
+        int w = sendmsg(fd, &msg, 0);
+        if(w == -1) {
+            perror("send()");
+            result =  -1;
+            break;
+        }
+        written += w;
+        //usleep(1000); // TODO: If not slept a UDP buffer overflow occurs and parts are missing at the reception side (at least via localhost)
+    }
+
+    return (result == 0 ? written : result);
+}
+
+//
+// Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure
+// If the message is completely reassembled true is returned and the index and size have valid values
+//
+bool largeUdp_dataAvailable(largeUdp_pt handle, int fd, unsigned int *index, unsigned int *size) {
+    msg_part_header_t header;
+    int result = false;
+    // Only read the header, we don't know yet where to store the payload
+    if(recv(fd, &header, sizeof(header), MSG_PEEK) < 0) {
+        perror("read()");
+        return false;
+    }
+
+    struct iovec msg_vec[2];
+    struct msghdr msg;
+    msg.msg_name = NULL;
+    msg.msg_namelen = 0;
+    msg.msg_flags = 0;
+    msg.msg_iov = msg_vec;
+    msg.msg_iovlen = 2; // header and payload;
+    msg.msg_control = NULL;
+    msg.msg_controllen = 0;
+
+    msg.msg_iov[0].iov_base = &header;
+    msg.msg_iov[0].iov_len = sizeof(header);
+
+    pthread_mutex_lock(&handle->dbLock);
+
+    int nrUdpLists = arrayList_size(handle->udpPartLists);
+    int i;
+    bool found = false;
+    for(i = 0; i < nrUdpLists; i++) {
+        udpPartList_pt udpPartList = arrayList_get(handle->udpPartLists, i);
+        if(udpPartList->msg_ident == header.msg_ident) {
+            found = true;
+
+            //sanity check
+            if(udpPartList->msg_size != header.total_msg_size) {
+                // Corruption occurred. Remove the existing administration and build up a new one.
+                arrayList_remove(handle->udpPartLists, i);
+                free(udpPartList->data);
+                free(udpPartList);
+                found = false;
+                break;
+            }
+
+            msg.msg_iov[1].iov_base = &udpPartList->data[header.offset];
+            msg.msg_iov[1].iov_len = header.part_msg_size;
+            recvmsg(fd, &msg, 0);
+
+            udpPartList->nrPartsRemaining--;
+            if(udpPartList->nrPartsRemaining == 0) {
+                *index = i;
+                *size = udpPartList->msg_size;
+                result = true;
+                break;
+            } else {
+                result = false; // not complete
+                break;
+            }
+        }
+    }
+
+    if(found == false) {
+        udpPartList_pt udpPartList = NULL;
+        if(nrUdpLists == handle->maxNrLists) {
+            // remove list at index 0
+            udpPartList = arrayList_remove(handle->udpPartLists, 0);
+            fprintf(stderr, "ERROR: Removing entry for id %d: %d parts not received\n",udpPartList->msg_ident, udpPartList->nrPartsRemaining );
+            free(udpPartList->data);
+            free(udpPartList);
+            nrUdpLists--;
+        }
+        udpPartList = calloc(sizeof(*udpPartList), 1);
+        udpPartList->msg_ident =  header.msg_ident;
+        udpPartList->msg_size =  header.total_msg_size;
+        udpPartList->nrPartsRemaining = header.total_msg_size / MAX_PART_SIZE;
+        udpPartList->data = calloc(sizeof(char), header.total_msg_size);
+
+        msg.msg_iov[1].iov_base = &udpPartList->data[header.offset];
+        msg.msg_iov[1].iov_len = header.part_msg_size;
+        recvmsg(fd, &msg, 0);
+
+        arrayList_add(handle->udpPartLists, udpPartList);
+
+        if(udpPartList->nrPartsRemaining == 0) {
+            *index = nrUdpLists;
+            *size = udpPartList->msg_size;
+            result = true;
+        } else {
+            result = false;
+        }
+
+    }
+    pthread_mutex_unlock(&handle->dbLock);
+
+    return result;
+}
+
+//
+// Read out the message which is indicated available by the largeUdp_dataAvailable function
+//
+int largeUdp_read(largeUdp_pt handle, unsigned int index, void ** buffer, unsigned int size)
+{
+    int result = 0;
+    pthread_mutex_lock(&handle->dbLock);
+
+    udpPartList_pt udpPartList = arrayList_remove(handle->udpPartLists, index);
+    if(udpPartList) {
+    	*buffer = udpPartList->data;
+        free(udpPartList);
+    } else {
+    	result = -1;
+    }
+    pthread_mutex_unlock(&handle->dbLock);
+
+    return result;
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c
new file mode 100644
index 0000000..053c757
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c
@@ -0,0 +1,113 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * psa_activator.c
+ *
+ *  \date       Sep 30, 2011
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include <stdlib.h>
+
+#include "bundle_activator.h"
+#include "service_registration.h"
+
+#include "pubsub_admin_impl.h"
+
+struct activator {
+	pubsub_admin_pt admin;
+	pubsub_admin_service_pt adminService;
+	service_registration_pt registration;
+};
+
+celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator;
+
+	activator = calloc(1, sizeof(*activator));
+	if (!activator) {
+		status = CELIX_ENOMEM;
+	}
+	else{
+		*userData = activator;
+		status = pubsubAdmin_create(context, &(activator->admin));
+	}
+
+	return status;
+}
+
+celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator = userData;
+	pubsub_admin_service_pt pubsubAdminSvc = calloc(1, sizeof(*pubsubAdminSvc));
+
+	if (!pubsubAdminSvc) {
+		status = CELIX_ENOMEM;
+	}
+	else{
+		pubsubAdminSvc->admin = activator->admin;
+
+		pubsubAdminSvc->addPublication = pubsubAdmin_addPublication;
+		pubsubAdminSvc->removePublication = pubsubAdmin_removePublication;
+
+		pubsubAdminSvc->addSubscription = pubsubAdmin_addSubscription;
+		pubsubAdminSvc->removeSubscription = pubsubAdmin_removeSubscription;
+
+		pubsubAdminSvc->closeAllPublications = pubsubAdmin_closeAllPublications;
+		pubsubAdminSvc->closeAllSubscriptions = pubsubAdmin_closeAllSubscriptions;
+
+		activator->adminService = pubsubAdminSvc;
+
+		status = bundleContext_registerService(context, PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL, &activator->registration);
+
+	}
+
+
+	return status;
+}
+
+celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator = userData;
+
+	serviceRegistration_unregister(activator->registration);
+	activator->registration = NULL;
+
+	pubsubAdmin_stop(activator->admin);
+
+	free(activator->adminService);
+	activator->adminService = NULL;
+
+	return status;
+}
+
+celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator = userData;
+
+	pubsubAdmin_destroy(activator->admin);
+	activator->admin = NULL;
+
+	free(activator);
+
+	return status;
+}
+
+