You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2017/02/06 18:34:30 UTC
[18/19] celix git commit: CELIX-389: Refactors pubsub.
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h b/celix-pubsub/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h
deleted file mode 100644
index 834dada..0000000
--- a/celix-pubsub/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_publisher_private.h
- *
- * \date Sep 21, 2010
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#ifndef PUBSUB_PUBLISHER_PRIVATE_H_
-#define PUBSUB_PUBLISHER_PRIVATE_H_
-
-#include <celixbool.h>
-#include <pthread.h>
-#include "publisher.h"
-
-struct pubsub_sender {
- array_list_pt trackers;
- const char *ident;
- hash_map_pt tid_map; //service -> tid
- long bundleId;
-};
-
-typedef struct pubsub_sender * pubsub_sender_pt;
-
-typedef struct send_thread_struct{
- pubsub_publisher_pt service;
- pubsub_sender_pt publisher;
- const char *topic;
-} *send_thread_struct_pt;
-
-pubsub_sender_pt publisher_create(array_list_pt trackers, const char* ident,long bundleId);
-
-void publisher_start(pubsub_sender_pt client);
-void publisher_stop(pubsub_sender_pt client);
-
-void publisher_destroy(pubsub_sender_pt client);
-
-celix_status_t publisher_publishSvcAdded(void * handle, service_reference_pt reference, void * service);
-celix_status_t publisher_publishSvcRemoved(void * handle, service_reference_pt reference, void * service);
-
-
-#endif /* PUBSUB_PUBLISHER_PRIVATE_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c b/celix-pubsub/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
deleted file mode 100644
index e4a8ba8..0000000
--- a/celix-pubsub/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * ps_pub_activator.c
- *
- * \date Sep 21, 2010
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#include <sys/cdefs.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include "bundle_activator.h"
-#include "service_tracker.h"
-#include "constants.h"
-
-#include "pubsub_common.h"
-#include "pubsub_utils.h"
-#include "publisher.h"
-#include "pubsub_publisher_private.h"
-
-#define PUB_TOPIC "poi1;poi2"
-
-struct publisherActivator {
- pubsub_sender_pt client;
- array_list_pt trackerList;//List<service_tracker_pt>
-};
-
-celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
- struct publisherActivator * act = malloc(sizeof(*act));
-
- const char* fwUUID = NULL;
-
- bundleContext_getProperty(context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
- if(fwUUID==NULL){
- printf("PUBLISHER: Cannot retrieve fwUUID.\n");
- return CELIX_INVALID_BUNDLE_CONTEXT;
- }
-
- bundle_pt bundle = NULL;
- long bundleId = 0;
- bundleContext_getBundle(context,&bundle);
- bundle_getBundleId(bundle,&bundleId);
-
- arrayList_create(&(act->trackerList));
- act->client = publisher_create(act->trackerList,fwUUID,bundleId);
- *userData = act;
-
- return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
-
- struct publisherActivator * act = (struct publisherActivator *) userData;
-
- int i;
- array_list_pt topic_list = pubsub_getTopicsFromString(PUB_TOPIC);
-
- if(topic_list !=NULL){
-
- char filter[128];
- for(i=0; i<arrayList_size(topic_list);i++){
- char* topic = arrayList_get(topic_list,i);
- if(strlen(topic)<MAX_TOPIC_LEN){
-
- bundle_pt bundle = NULL;
- long bundleId = 0;
- bundleContext_getBundle(context,&bundle);
- bundle_getBundleId(bundle,&bundleId);
-
- service_tracker_pt tracker = NULL;
- memset(filter,0,128);
-#ifdef USE_SCOPE
- char *scope;
- asprintf(&scope, "my_scope_%d", i);
- snprintf(filter, 128, "(&(&(%s=%s)(%s=%s))(%s=%s))",
- (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_PUBLISHER_SERVICE_NAME,
- PUBSUB_PUBLISHER_TOPIC, topic,
- PUBLISHER_SCOPE, scope);
- free(scope);
-#else
- snprintf(filter, 128, "(&(%s=%s)(%s=%s))",
- (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_PUBLISHER_SERVICE_NAME,
- PUBSUB_PUBLISHER_TOPIC, topic);
-#endif
- service_tracker_customizer_pt customizer = NULL;
- serviceTrackerCustomizer_create(act->client,NULL,publisher_publishSvcAdded,NULL,publisher_publishSvcRemoved,&customizer);
- serviceTracker_createWithFilter(context, filter, customizer, &tracker);
-
- arrayList_add(act->trackerList,tracker);
- }
- else{
- printf("Topic %s too long. Skipping publication.\n",topic);
- }
- free(topic);
- }
- arrayList_destroy(topic_list);
-
- }
-
- publisher_start(act->client);
-
- for(i=0;i<arrayList_size(act->trackerList);i++){
- service_tracker_pt tracker = arrayList_get(act->trackerList,i);
- serviceTracker_open(tracker);
- }
-
- return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_stop(void * userData, bundle_context_pt __attribute__((unused)) context) {
- struct publisherActivator * act = (struct publisherActivator *) userData;
- int i;
-
- for(i=0;i<arrayList_size(act->trackerList);i++){
- service_tracker_pt tracker = arrayList_get(act->trackerList,i);
- serviceTracker_close(tracker);
- }
- publisher_stop(act->client);
-
- return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt __attribute__((unused)) context) {
- struct publisherActivator * act = (struct publisherActivator *) userData;
- int i;
-
- for(i=0;i<arrayList_size(act->trackerList);i++){
- service_tracker_pt tracker = arrayList_get(act->trackerList,i);
- serviceTracker_destroy(tracker);
- }
-
- publisher_destroy(act->client);
- arrayList_destroy(act->trackerList);
-
- free(act);
- printf("PUBLISHER: bundleActivator_destroy\n");
- return CELIX_SUCCESS;
-}
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c b/celix-pubsub/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
deleted file mode 100644
index 66454a0..0000000
--- a/celix-pubsub/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_publisher.c
- *
- * \date Sep 21, 2010
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-#include <string.h>
-#include <pthread.h>
-#include <unistd.h>
-
-#include "service_tracker.h"
-#include "celix_threads.h"
-
-#include "pubsub_common.h"
-#include "poi.h"
-
-#include "pubsub_publisher_private.h"
-
-static bool stop=false;
-
-static double randCoordinate(double min, double max){
-
- double ret = min + (((double)rand()) / (((double)RAND_MAX)/(max-min))) ;
-
- return ret;
-
-}
-
-static void* send_thread(void* arg){
-
- send_thread_struct_pt st_struct = (send_thread_struct_pt)arg;
-
- pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)st_struct->service;
- pubsub_sender_pt publisher = (pubsub_sender_pt)st_struct->publisher;
-
- char fwUUID[9];
- memset(fwUUID,0,9);
- memcpy(fwUUID,publisher->ident,8);
-
- //poi_t point = calloc(1,sizeof(*point));
- location_t place = calloc(1,sizeof(*place));
-
- char* desc = calloc(64,sizeof(char));
- snprintf(desc,64,"fw-%s [TID=%lu]", fwUUID, (unsigned long)pthread_self());
-
- char* name = calloc(64,sizeof(char));
- snprintf(name,64,"Bundle#%ld",publisher->bundleId);
-
- place->name = name;
- place->description = desc;
- place->extra = "DONT PANIC";
- printf("TOPIC : %s\n",st_struct->topic);
- unsigned int msgId = 0;
- if( publish_svc->localMsgTypeIdForMsgType(publish_svc->handle,st_struct->topic,&msgId) == 0 ){
-
- while(stop==false){
- place->position.lat = randCoordinate(MIN_LAT,MAX_LAT);
- place->position.lon = randCoordinate(MIN_LON,MAX_LON);
- int nr_char = (int)randCoordinate(5,100000);
- //int nr_char = 25;
- place->data = calloc(nr_char, 1);
- for(int i = 0; i < (nr_char-1); i++) {
- place->data[i] = i%10 + '0';
- }
- if(publish_svc->send) {
- publish_svc->send(publish_svc->handle,msgId,place);
- } else {
- printf("No send for %s\n", st_struct->topic);
- }
- printf("Sent %s [%f, %f] (%s, %s) data len = %d\n",st_struct->topic, place->position.lat, place->position.lon,place->name,place->description, nr_char);
- free(place->data);
- sleep(2);
- }
- }
- else{
- printf("PUBLISHER: Cannot retrieve msgId for message '%s'\n",MSG_POI_NAME);
- }
-
- free(place->description);
- free(place->name);
- free(place);
-
- free(st_struct);
-
-
- return NULL;
-
-}
-
-pubsub_sender_pt publisher_create(array_list_pt trackers,const char* ident,long bundleId) {
- pubsub_sender_pt publisher = malloc(sizeof(*publisher));
-
- publisher->trackers = trackers;
- publisher->ident = ident;
- publisher->bundleId = bundleId;
- publisher->tid_map = hashMap_create(NULL, NULL, NULL, NULL);
-
- return publisher;
-}
-
-void publisher_start(pubsub_sender_pt client) {
- printf("PUBLISHER: starting up...\n");
-}
-
-void publisher_stop(pubsub_sender_pt client) {
- printf("PUBLISHER: stopping...\n");
-}
-
-void publisher_destroy(pubsub_sender_pt client) {
- hashMap_destroy(client->tid_map, false, false);
- client->trackers = NULL;
- client->ident = NULL;
- free(client);
-}
-
-celix_status_t publisher_publishSvcAdded(void * handle, service_reference_pt reference, void * service){
- pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)service;
- pubsub_sender_pt manager = (pubsub_sender_pt)handle;
-
- printf("PUBLISHER: new publish service exported (%s).\n",manager->ident);
-
- send_thread_struct_pt data = calloc(1,sizeof(struct send_thread_struct));
- const char *value = NULL;
- serviceReference_getProperty(reference, PUBSUB_PUBLISHER_TOPIC, &value);
- data->service = publish_svc;
- data->publisher = manager;
- data->topic = value;
- celix_thread_t *tid = malloc(sizeof(*tid));
- stop=false;
- celixThread_create(tid,NULL,send_thread,(void*)data);
- hashMap_put(manager->tid_map, publish_svc, tid);
- return CELIX_SUCCESS;
-}
-
-celix_status_t publisher_publishSvcRemoved(void * handle, service_reference_pt reference, void * service){
- pubsub_sender_pt manager = (pubsub_sender_pt)handle;
- celix_thread_t *tid = hashMap_get(manager->tid_map, service);
-
-#if defined(__APPLE__) && defined(__MACH__)
- uint64_t threadid;
- pthread_threadid_np(tid->thread, &threadid);
- printf("PUBLISHER: publish service unexporting (%s) %llu!\n",manager->ident, threadid);
-#else
- printf("PUBLISHER: publish service unexporting (%s) %li!\n",manager->ident, tid->thread);
-#endif
-
- stop=true;
- celixThread_join(*tid,NULL);
- free(tid);
- return CELIX_SUCCESS;
-}
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/examples/pubsub/publisher2/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/examples/pubsub/publisher2/CMakeLists.txt b/celix-pubsub/pubsub/examples/pubsub/publisher2/CMakeLists.txt
deleted file mode 100644
index b83f7dd..0000000
--- a/celix-pubsub/pubsub/examples/pubsub/publisher2/CMakeLists.txt
+++ /dev/null
@@ -1,54 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-include_directories("../publisher/private/include")
-include_directories("${PROJECT_SOURCE_DIR}/framework/public/include")
-include_directories("${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include")
-include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub")
-
-add_bundle(org.apache.celix.pubsub_publisher.PoiPublisher2
- SYMBOLIC_NAME "apache_celix_pubsub_poi_publisher2"
- VERSION "1.0.0"
- SOURCES
- ../publisher/private/src/ps_pub_activator.c
- ../publisher/private/src/pubsub_publisher.c
- ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
-)
-
-bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher2
- ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
- ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
- DESTINATION "META-INF/descriptors"
-)
-
-bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher2
- ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/poi1.properties
- ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/poi2.properties
- DESTINATION "META-INF/topics/pub"
-)
-
-bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher2
- ${PROJECT_SOURCE_DIR}/pubsub/examples/keys/publisher
- DESTINATION "META-INF/keys"
-)
-
-bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher2
- ${PROJECT_SOURCE_DIR}/pubsub/examples/keys/subscriber/public
- DESTINATION "META-INF/keys/subscriber"
-)
-
-target_link_libraries(org.apache.celix.pubsub_publisher.PoiPublisher2 celix_framework celix_utils)
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/examples/pubsub/subscriber/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/examples/pubsub/subscriber/CMakeLists.txt b/celix-pubsub/pubsub/examples/pubsub/subscriber/CMakeLists.txt
deleted file mode 100644
index 7fd9fae..0000000
--- a/celix-pubsub/pubsub/examples/pubsub/subscriber/CMakeLists.txt
+++ /dev/null
@@ -1,55 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-include_directories("private/include")
-include_directories("${PROJECT_SOURCE_DIR}/framework/public/include")
-include_directories("${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include")
-include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub")
-include_directories("${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/common/include")
-
-add_bundle(org.apache.celix.pubsub_subscriber.PoiSubscriber
- SYMBOLIC_NAME "apache_celix_pubsub_poi_subscriber"
- VERSION "1.0.0"
- SOURCES
- private/src/ps_sub_activator.c
- private/src/pubsub_subscriber.c
- ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
-)
-
-bundle_files(org.apache.celix.pubsub_subscriber.PoiSubscriber
- ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
- ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
- DESTINATION "META-INF/descriptors"
-)
-
-bundle_files(org.apache.celix.pubsub_subscriber.PoiSubscriber
- ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/poi1.properties
- ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/poi2.properties
- DESTINATION "META-INF/topics/sub"
-)
-
-bundle_files(org.apache.celix.pubsub_subscriber.PoiSubscriber
- ${PROJECT_SOURCE_DIR}/pubsub/examples/keys/subscriber
- DESTINATION "META-INF/keys"
-)
-
-bundle_files(org.apache.celix.pubsub_subscriber.PoiSubscriber
- ${PROJECT_SOURCE_DIR}/pubsub/examples/keys/publisher/public
- DESTINATION "META-INF/keys/publisher"
-)
-
-target_link_libraries(org.apache.celix.pubsub_subscriber.PoiSubscriber celix_framework celix_utils)
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h b/celix-pubsub/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
deleted file mode 100644
index c6072df..0000000
--- a/celix-pubsub/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_subscriber_private.h
- *
- * \date Sep 21, 2010
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#ifndef PUBSUB_SUBSCRIBER_PRIVATE_H_
-#define PUBSUB_SUBSCRIBER_PRIVATE_H_
-
-
-#include <string.h>
-
-#include "celixbool.h"
-
-#include "pubsub_common.h"
-#include "subscriber.h"
-
-struct pubsub_receiver {
- char * name;
-};
-
-typedef struct pubsub_receiver* pubsub_receiver_pt;
-
-pubsub_receiver_pt subscriber_create(char* topics);
-void subscriber_start(pubsub_receiver_pt client);
-void subscriber_stop(pubsub_receiver_pt client);
-void subscriber_destroy(pubsub_receiver_pt client);
-
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg,pubsub_multipart_callbacks_t *callbacks, bool* release);
-
-
-#endif /* PUBSUB_SUBSCRIBER_PRIVATE_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/examples/pubsub/subscriber/private/src/ps_sub_activator.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/examples/pubsub/subscriber/private/src/ps_sub_activator.c b/celix-pubsub/pubsub/examples/pubsub/subscriber/private/src/ps_sub_activator.c
deleted file mode 100644
index efd34c9..0000000
--- a/celix-pubsub/pubsub/examples/pubsub/subscriber/private/src/ps_sub_activator.c
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * ps_sub_activator.c
- *
- * \date Sep 21, 2010
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-
-#include "bundle_activator.h"
-
-#include "pubsub_common.h"
-#include "pubsub_utils.h"
-#include "pubsub_subscriber_private.h"
-
-#define SUB_TOPIC "poi1;poi2"
-
-struct subscriberActivator {
- array_list_pt registrationList; //List<service_registration_pt>
- pubsub_subscriber_pt subsvc;
-};
-
-celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
- struct subscriberActivator * act = calloc(1,sizeof(struct subscriberActivator));
- *userData = act;
- arrayList_create(&(act->registrationList));
- return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
- struct subscriberActivator * act = (struct subscriberActivator *) userData;
-
-
- pubsub_subscriber_pt subsvc = calloc(1,sizeof(*subsvc));
- pubsub_receiver_pt sub = subscriber_create(SUB_TOPIC);
- subsvc->handle = sub;
- subsvc->receive = pubsub_subscriber_recv;
-
- act->subsvc = subsvc;
-
- array_list_pt topic_list = pubsub_getTopicsFromString(SUB_TOPIC);
-
- if(topic_list !=NULL){
-
- int i;
- for(i=0; i<arrayList_size(topic_list);i++){
- char* topic = arrayList_get(topic_list,i);
- if(strlen(topic)<MAX_TOPIC_LEN){
- properties_pt props = properties_create();
- properties_set(props, PUBSUB_SUBSCRIBER_TOPIC,topic);
-#ifdef USE_SCOPE
- char *scope;
- asprintf(&scope, "my_scope_%d", i);
- properties_set(props,SUBSCRIBER_SCOPE,scope);
- free(scope);
-#endif
- service_registration_pt reg = NULL;
- bundleContext_registerService(context, PUBSUB_SUBSCRIBER_SERVICE_NAME, subsvc, props, ®);
- arrayList_add(act->registrationList,reg);
- }
- else{
- printf("Topic %s too long. Skipping subscription.\n",topic);
- }
- free(topic);
- }
- arrayList_destroy(topic_list);
-
- }
-
- subscriber_start((pubsub_receiver_pt)act->subsvc->handle);
-
- return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
- struct subscriberActivator * act = (struct subscriberActivator *) userData;
-
- int i;
- for(i=0; i<arrayList_size(act->registrationList);i++){
- service_registration_pt reg = arrayList_get(act->registrationList,i);
- serviceRegistration_unregister(reg);
-
- }
-
- subscriber_stop((pubsub_receiver_pt)act->subsvc->handle);
-
- return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) {
-
- struct subscriberActivator * act = (struct subscriberActivator *) userData;
-
- act->subsvc->receive = NULL;
- subscriber_destroy((pubsub_receiver_pt)act->subsvc->handle);
- act->subsvc->handle = NULL;
- free(act->subsvc);
- act->subsvc = NULL;
-
- arrayList_destroy(act->registrationList);
- free(act);
-
- return CELIX_SUCCESS;
-}
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c b/celix-pubsub/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
deleted file mode 100644
index a137253..0000000
--- a/celix-pubsub/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_subscriber.c
- *
- * \date Sep 21, 2010
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-#include <stdio.h>
-
-#include "poi.h"
-#include "pubsub_subscriber_private.h"
-
-pubsub_receiver_pt subscriber_create(char* topics) {
- pubsub_receiver_pt sub = calloc(1,sizeof(*sub));
- sub->name = strdup(topics);
- return sub;
-}
-
-
-void subscriber_start(pubsub_receiver_pt subscriber){
- printf("Subscriber started...\n");
-}
-
-void subscriber_stop(pubsub_receiver_pt subscriber){
- printf("Subscriber stopped...\n");
-}
-
-void subscriber_destroy(pubsub_receiver_pt subscriber){
- if(subscriber->name!=NULL){
- free(subscriber->name);
- }
- subscriber->name=NULL;
- free(subscriber);
-}
-
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg,pubsub_multipart_callbacks_t *callbacks, bool* release){
-
- location_t place = (location_t)msg;
- int nrchars = 25;
- printf("Recv (%s): [%f, %f] (%s, %s) data_len = %ld data =%*.*s\n",msgType, place->position.lat, place->position.lon,place->name,place->description, strlen(place->data) + 1, nrchars, nrchars, place->data);
-
- return 0;
-
-}
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/keygen/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/keygen/CMakeLists.txt b/celix-pubsub/pubsub/keygen/CMakeLists.txt
deleted file mode 100644
index bc42173..0000000
--- a/celix-pubsub/pubsub/keygen/CMakeLists.txt
+++ /dev/null
@@ -1,34 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-if (BUILD_ZMQ_SECURITY)
-
- find_package(ZMQ REQUIRED)
- find_package(CZMQ REQUIRED)
- find_package(OpenSSL 1.1.0 REQUIRED)
-
- include_directories("${ZMQ_INCLUDE_DIR}")
- include_directories("${CZMQ_INCLUDE_DIR}")
- include_directories("${OPENSSL_INCLUDE_DIR}")
-
- add_executable(makecert makecert.c)
- target_link_libraries(makecert ${CZMQ_LIBRARIES})
-
- add_executable(ed_file ed_file.c)
- target_link_libraries(ed_file ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY})
-
-endif()
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/keygen/ed_file.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/keygen/ed_file.c b/celix-pubsub/pubsub/keygen/ed_file.c
deleted file mode 100644
index a0fc7e2..0000000
--- a/celix-pubsub/pubsub/keygen/ed_file.c
+++ /dev/null
@@ -1,309 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * ed_file.c
- *
- * \date Dec 2, 2016
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#include <czmq.h>
-#include <openssl/evp.h>
-
-#include <stdlib.h>
-#include <stdio.h>
-#include <strings.h>
-
-#define MAX_KEY_FILE_LENGTH 256
-#define MAX_LINE_LENGTH 64
-#define AES_KEY_LENGTH 32
-#define AES_IV_LENGTH 16
-
-#define KEY_TO_GET "aes_key"
-#define IV_TO_GET "aes_iv"
-
-int generate_sha256_hash(char* text, unsigned char* digest);
-int encrypt_aes(unsigned char *plaintext, int plaintext_len, unsigned char *key, unsigned char *iv, unsigned char *ciphertext);
-int decrypt_aes(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext);
-
-static char* read_keys_file_content(const char *filePath);
-static void parse_key_lines(char *keysBuffer, char **key, char **iv);
-static void parse_key_line(char *line, char **key, char **iv);
-
-int main(int argc, const char* argv[])
-{
- if (argc < 4){
- printf("Usage: %s <key_file> <input_file> <output_file> [options]\n", argv[0]);
- printf("Default behavior: encrypting a file\n");
- printf("Options:\n");
- printf("\t-d\tSpecify to decrypt a file\n");
- printf("\n");
- return EXIT_FAILURE;
- }
-
- int rc = 0;
-
- const char* keys_file_path = argv[1];
- const char* input_file_path = argv[2];
- const char* output_file_path = argv[3];
-
- bool decryptParam = false;
- if (argc > 4 && strcmp(argv[4], "-d") == 0){
- decryptParam = true;
- }
-
- if (!zsys_file_exists(keys_file_path)){
- printf("Keys file '%s' doesn't exist!\n", keys_file_path);
- return EXIT_FAILURE;
- }
-
- if (!zsys_file_exists(input_file_path)){
- printf("Input file does not exist!\n");
- return EXIT_FAILURE;
- }
-
- char* keys_data = read_keys_file_content(keys_file_path);
- if (keys_data == NULL){
- return EXIT_FAILURE;
- }
-
- char* key = NULL;
- char* iv = NULL;
- parse_key_lines(keys_data, &key, &iv);
- free(keys_data);
-
- if (key == NULL || iv == NULL){
- printf("Loading AES key and/or AES iv failed!\n");
- free(key);
- free(iv);
- return EXIT_FAILURE;
- }
-
- printf("Using AES Key \t\t'%s'\n", key);
- printf("Using AES IV \t\t'%s'\n", iv);
- printf("Input file path \t'%s'\n", input_file_path);
- printf("Output file path \t'%s'\n", output_file_path);
- printf("Decrypting \t\t'%s'\n\n", (decryptParam) ? "true" : "false");
-
- unsigned char key_digest[EVP_MAX_MD_SIZE];
- unsigned char iv_digest[EVP_MAX_MD_SIZE];
- generate_sha256_hash((char*) key, key_digest);
- generate_sha256_hash((char*) iv, iv_digest);
-
- zchunk_t* input_chunk = zchunk_slurp (input_file_path, 0);
- if (input_chunk == NULL){
- printf("Input file not correct!\n");
- free(key);
- free(iv);
- return EXIT_FAILURE;
- }
-
- //Load input data from file
- int input_file_size = (int) zchunk_size (input_chunk);
- char* input_file_data = zchunk_strdup(input_chunk);
- zchunk_destroy (&input_chunk);
-
- int output_len;
- unsigned char output[input_file_size];
- if (decryptParam){
- output_len = decrypt_aes((unsigned char*) input_file_data, input_file_size, key_digest, iv_digest, output);
- output[output_len] = '\0';
- }else{
- output_len = encrypt_aes((unsigned char*) input_file_data, input_file_size, key_digest, iv_digest, output);
- }
-
- //Write output data to file
- zfile_t* output_file = zfile_new (".", output_file_path);
- zchunk_t* output_chunk = zchunk_new(output, output_len);
- rc = zfile_output (output_file);
- if (rc != 0){
- printf("Problem with opening file for writing!\n");
- zchunk_destroy (&output_chunk);
- zfile_close (output_file);
- zfile_destroy (&output_file);
- free(input_file_data);
- free(key);
- free(iv);
-
- return EXIT_FAILURE;
- }
-
- rc = zfile_write (output_file, output_chunk, 0);
- if (rc != 0){
- printf("Problem with writing output to file!\n");
- }
- printf("Output written to file:\n");
- if (decryptParam){
- printf("%s\n", output);
- }else{
- BIO_dump_fp (stdout, (const char *) output, output_len);
- }
-
- zchunk_destroy (&output_chunk);
- zfile_close (output_file);
- zfile_destroy (&output_file);
- free(input_file_data);
- free(key);
- free(iv);
-
- return EXIT_SUCCESS;
-}
-
-int generate_sha256_hash(char* text, unsigned char* digest)
-{
- unsigned int digest_len;
-
- EVP_MD_CTX * mdctx = EVP_MD_CTX_new();
- EVP_DigestInit_ex(mdctx, EVP_sha256(), NULL);
- EVP_DigestUpdate(mdctx, text, strlen(text));
- EVP_DigestFinal_ex(mdctx, digest, &digest_len);
- EVP_MD_CTX_free(mdctx);
-
- return digest_len;
-}
-
-int encrypt_aes(unsigned char *plaintext, int plaintext_len, unsigned char *key, unsigned char *iv, unsigned char *ciphertext)
-{
- int len;
- int ciphertext_len;
-
- EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new();
-
- EVP_EncryptInit_ex(ctx, EVP_aes_256_cbc(), NULL, key, iv);
- EVP_EncryptUpdate(ctx, ciphertext, &len, plaintext, plaintext_len);
- ciphertext_len = len;
- EVP_EncryptFinal_ex(ctx, ciphertext + len, &len);
- ciphertext_len += len;
-
- EVP_CIPHER_CTX_free(ctx);
-
- return ciphertext_len;
-}
-
-int decrypt_aes(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext)
-{
- int len;
- int plaintext_len;
-
- EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new();
-
- EVP_DecryptInit_ex(ctx, EVP_aes_256_cbc(), NULL, key, iv);
- EVP_DecryptUpdate(ctx, plaintext, &len, ciphertext, ciphertext_len);
- plaintext_len = len;
- EVP_DecryptFinal_ex(ctx, plaintext + len, &len);
- plaintext_len += len;
-
- EVP_CIPHER_CTX_free(ctx);
-
- return plaintext_len;
-}
-
-static char* read_keys_file_content(const char *keys_file_path){
- char* keys_file_full_path = strndup(keys_file_path, MAX_KEY_FILE_LENGTH);
- char* keys_file_name = NULL;
-
- char* sep_kf_at = strrchr(keys_file_path, '/');
- if (sep_kf_at != NULL){
- *sep_kf_at = '\0';
- keys_file_name = sep_kf_at + 1;
- }else{
- keys_file_name = (char*) keys_file_path;
- keys_file_path = (const char*) ".";
- }
-
- printf("Keys file path: %s\n", keys_file_full_path);
-
- int rc = 0;
-
- zfile_t* keys_file = zfile_new (keys_file_path, keys_file_name);
- rc = zfile_input (keys_file);
- if (rc != 0){
- printf("Keys file '%s' not readable!\n", keys_file_full_path);
- zfile_destroy(&keys_file);
- free(keys_file_full_path);
- return NULL;
- }
-
- ssize_t keys_file_size = zsys_file_size (keys_file_full_path);
- zchunk_t* keys_chunk = zfile_read (keys_file, keys_file_size, 0);
- if (keys_chunk == NULL){
- printf("Can't read file '%s'!\n", keys_file_full_path);
- zfile_close(keys_file);
- zfile_destroy(&keys_file);
- free(keys_file_full_path);
- return NULL;
- }
-
- char* keys_data = zchunk_strdup(keys_chunk);
- zchunk_destroy(&keys_chunk);
- zfile_close(keys_file);
- zfile_destroy (&keys_file);
-
- return keys_data;
-}
-
-static void parse_key_lines(char *keysBuffer, char **key, char **iv){
- char *line = NULL, *saveLinePointer = NULL;
-
- bool firstTime = true;
- do {
- if (firstTime){
- line = strtok_r(keysBuffer, "\n", &saveLinePointer);
- firstTime = false;
- }else {
- line = strtok_r(NULL, "\n", &saveLinePointer);
- }
-
- if (line == NULL){
- break;
- }
-
- parse_key_line(line, key, iv);
-
- } while((*key == NULL || *iv == NULL) && line != NULL);
-
-}
-
-static void parse_key_line(char *line, char **key, char **iv){
- char *detectedKey = NULL, *detectedValue= NULL;
-
- char* sep_at = strchr(line, ':');
- if (sep_at == NULL){
- return;
- }
-
- *sep_at = '\0'; // overwrite first separator, creating two strings.
- detectedKey = line;
- detectedValue = sep_at + 1;
-
- if (detectedKey == NULL || detectedValue == NULL){
- return;
- }
- if (detectedKey[0] == '\0' || detectedValue[0] == '\0'){
- return;
- }
-
- if (*key == NULL && strcmp(detectedKey, KEY_TO_GET) == 0){
- *key = strndup(detectedValue, AES_KEY_LENGTH);
- } else if (*iv == NULL && strcmp(detectedKey, IV_TO_GET) == 0){
- *iv = strndup(detectedValue, AES_IV_LENGTH);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/keygen/makecert.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/keygen/makecert.c b/celix-pubsub/pubsub/keygen/makecert.c
deleted file mode 100644
index 166111e..0000000
--- a/celix-pubsub/pubsub/keygen/makecert.c
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * makecert.c
- *
- * \date Dec 2, 2016
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#include <string.h>
-
-#include "czmq.h"
-
-int main (int argc, const char * argv[])
-{
-
- const char * cert_name_public = "certificate.pub";
- const char * cert_name_secret = "certificate.key";
- if (argc == 3 && strcmp(argv[1], argv[2]) != 0){
- cert_name_public = argv[1];
- cert_name_secret = argv[2];
- }
-
- zcert_t * cert = zcert_new();
-
- char *timestr = zclock_timestr ();
- zcert_set_meta (cert, "date-created", timestr);
- free (timestr);
-
- zcert_save_public(cert, cert_name_public);
- zcert_save_secret(cert, cert_name_secret);
- zcert_print (cert);
- printf("\n");
- printf("I: CURVE certificate created in %s and %s\n", cert_name_public, cert_name_secret);
- zcert_destroy (&cert);
-
- return 0;
-}
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/CMakeLists.txt b/celix-pubsub/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
deleted file mode 100644
index dd25b19..0000000
--- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
+++ /dev/null
@@ -1,57 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-find_package(Jansson REQUIRED)
-
-include_directories("${PROJECT_SOURCE_DIR}/utils/public/include")
-include_directories("${PROJECT_SOURCE_DIR}/log_service/public/include")
-include_directories("${PROJECT_SOURCE_DIR}/dfi/public/include")
-include_directories("${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include")
-include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub")
-include_directories("private/include")
-include_directories("public/include")
-include_directories("${JANSSON_INCLUDE_DIR}")
-if (SERIALIZER_PATH)
- include_directories("${SERIALIZER_PATH}/include")
-endif()
-if (SERIALIZER_LIB_INCLUDE_DIR)
- include_directories("${SERIALIZER_LIB_INCLUDE_DIR}")
-endif()
-if (SERIALIZER_LIB_DIR)
- link_directories("${SERIALIZER_LIB_DIR}")
-endif()
-
-add_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc
- BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_udp_multicast"
- VERSION "1.0.0"
- SOURCES
- private/src/psa_activator.c
- private/src/pubsub_admin_impl.c
- private/src/topic_subscription.c
- private/src/topic_publication.c
- private/src/large_udp.c
- ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/dyn_msg_utils.c
- ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
- ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
- ${PUBSUB_SERIALIZER_SRC}
-)
-
-set_target_properties(org.apache.celix.pubsub_admin.PubSubAdminUdpMc PROPERTIES INSTALL_RPATH "$ORIGIN")
-target_link_libraries(org.apache.celix.pubsub_admin.PubSubAdminUdpMc celix_framework celix_utils celix_dfi ${JANSSON_LIBRARIES} ${SERIALIZER_LIBRARY})
-
-install_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc)
-
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_udp_mc/README.md
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/README.md b/celix-pubsub/pubsub/pubsub_admin_udp_mc/README.md
deleted file mode 100644
index 19c7b86..0000000
--- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/README.md
+++ /dev/null
@@ -1,62 +0,0 @@
-#PUBSUB-Admin UDP Multicast
-
----
-
-##Description
-
-This description is particular for the UDP-Multicast PUB-SUB.
-
-The UDP multicast pubsub admin is used to transfer user data transparent via UDP multicast. UDP packets can contain approximately
-64kB . To overcome this limit the admin has a protocol on top of UDP which fragments the data to be send and these
-fragments are reassembled at the reception side.
-
-### IP Addresses
-
-To use UDP-multicast 2 IP adresses are needed:
-
-1. IP address which is bound to an (ethernet) interface
-2. The multicast address (in the range 224.X.X.X - 239.X.X.X)
-
-When the PubSubAdmin starts it determines the bound IP address. This is done in the order:
-
-1. The first IP number bound to the interface which is set by the "PSA_INTERFACE" property
-2. The interfaces are iterated and the first IP number found is used. (typically this is 127.0.0.1 (localhost)
-
-The Multicass IP address is determined in the order:
-
-1. If the `PSA_IP` property is defined, this IP will be used as multicast.
-2. If the `PSA_MC_PREFIX` property, is defined, this property is used as the first 2 numbers of the multicast address extended with the last 2 numbers of the bound IP.
-3. If the `PSA_MC_PREFIX` property is not defined `224.100` is used.
-
-### Discovery
-
-When a publisher request for a topic a TopicSender is created by a ServiceFactory. This TopicSender uses the multicast address as described above with a random chosen portnumber. The combination of the multicast-IP address with the portnumber and protocol(udp) is the endpoint.
-This endpoint is published by the PubSubDiscovery within its topic in ETCD (i.e. udp://224.100.10.20:40123).
-
-A subscriber, interested in the topic, is informed by the the ToplogyManager that there is a new endpoint. The TopicReceiver at the subscriber side creates a listening socket based on this endpoint.
-
-Now a data-connection is created and data send by the publisher will be received by the subscriber.
-
----
-
-##Properties
-
-<table border="1">
- <tr><th>Property</th><th>Description</th></tr>
- <tr><td>PSA_INTERFACE</td><td>Interface which has to be used for multicast communication</td></tr>
- <tr><td>PSA_IP</td><td>Multicast IP address used by the bundle</td></tr>
- <tr><td>PSA_MC_PREFIX</td><td>First 2 digits of the MC IP address </td></tr>
-</table>
-
----
-
-##Shortcomings
-
-1. Per topic a random portnr is used for creating an endpoint. It is theoretical possible that for 2 topic the same endpoint is created.
-2. For every message a 32 bit random message ID is generated to discriminate segments of different messages which could be sent at the same time. It is theoretically possible that there are 2 equal message ID's at the same time. But since the mesage ID is valid only during the transmission of a message (maximum some milliseconds with large messages) this is not very plausible.
-3. When sending large messages, these messages are segmented and sent after each other. This could cause UDP-buffer overflows in the kernel. A solution could be to add a delay between sending of the segements but this will introduce extra latency.
-4. A Hash is created, using the message definition, to identify the message type. When 2 messages generate the same hash something will terribly go wrong. A check should be added to prevent this (or another way to identify the message type). This problem is also valid for the other admins.
-
-
-
-
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/large_udp.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/large_udp.h b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/large_udp.h
deleted file mode 100644
index a21e654..0000000
--- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/large_udp.h
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * large_udp.h
- *
- * \date Mar 1, 2016
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#ifndef _LARGE_UDP_H_
-#define _LARGE_UDP_H_
-#include <stdbool.h>
-#include <stdlib.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-
-typedef struct largeUdp *largeUdp_pt;
-
-largeUdp_pt largeUdp_create(unsigned int maxNrUdpReceptions);
-void largeUdp_destroy(largeUdp_pt handle);
-
-int largeUdp_sendto(largeUdp_pt handle, int fd, void *buf, size_t count, int flags, struct sockaddr_in *dest_addr, size_t addrlen);
-int largeUdp_sendmsg(largeUdp_pt handle, int fd, struct iovec *largeMsg_iovec, int len, int flags, struct sockaddr_in *dest_addr, size_t addrlen);
-bool largeUdp_dataAvailable(largeUdp_pt handle, int fd, unsigned int *index, unsigned int *size);
-int largeUdp_read(largeUdp_pt handle, unsigned int index, void ** buffer, unsigned int size);
-
-#endif /* _LARGE_UDP_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
deleted file mode 100644
index 35fc164..0000000
--- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_admin_impl.h
- *
- * \date Dec 5, 2013
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#ifndef PUBSUB_ADMIN_IMPL_H_
-#define PUBSUB_ADMIN_IMPL_H_
-
-#include "pubsub_admin.h"
-#include "log_helper.h"
-
-struct pubsub_admin {
-
- bundle_context_pt bundle_context;
- log_helper_pt loghelper;
-
- celix_thread_mutex_t localPublicationsLock;
- hash_map_pt localPublications;//<topic(string),service_factory_pt>
-
- celix_thread_mutex_t externalPublicationsLock;
- hash_map_pt externalPublications;//<topic(string),List<pubsub_ep>>
-
- celix_thread_mutex_t subscriptionsLock;
- hash_map_pt subscriptions; //<topic(string),topic_subscription>
-
- celix_thread_mutex_t pendingSubscriptionsLock;
- hash_map_pt pendingSubscriptions; //<topic(string),List<pubsub_ep>>
-
- char* ifIpAddress; // The local interface which is used for multicast communication
- char* mcIpAddress; // The multicast IP address
-
- int sendSocket;
-
-};
-
-celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin);
-celix_status_t pubsubAdmin_stop(pubsub_admin_pt admin);
-celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin);
-
-celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
-celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
-
-celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP);
-celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP);
-
-celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char* scope, char* topic);
-celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope, char* topic);
-
-celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score);
-celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score);
-
-#endif /* PUBSUB_ADMIN_IMPL_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
deleted file mode 100644
index 57c7963..0000000
--- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_publish_service_private.h
- *
- * \date Sep 24, 2015
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#ifndef PUBSUB_PUBLISH_SERVICE_PRIVATE_H_
-#define PUBSUB_PUBLISH_SERVICE_PRIVATE_H_
-
-#include "publisher.h"
-#include "pubsub_endpoint.h"
-#include "pubsub_common.h"
-
-#define UDP_BASE_PORT 49152
-#define UDP_MAX_PORT 65000
-
-typedef struct pubsub_udp_msg {
- struct pubsub_msg_header header;
- unsigned int payloadSize;
- char payload[];
-} *pubsub_udp_msg_pt;
-
-typedef struct topic_publication *topic_publication_pt;
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP,char* bindIP, topic_publication_pt *out);
-celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
-
-celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
-celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
-
-celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
-celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);
-
-array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub);
-
-#endif /* PUBSUB_PUBLISH_SERVICE_PRIVATE_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
deleted file mode 100644
index 4ec705b..0000000
--- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * topic_subscription.h
- *
- * \date Sep 22, 2015
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#ifndef TOPIC_SUBSCRIPTION_H_
-#define TOPIC_SUBSCRIPTION_H_
-
-#include "celix_threads.h"
-#include "array_list.h"
-#include "celixbool.h"
-#include "service_tracker.h"
-
-#include "pubsub_endpoint.h"
-#include "pubsub_common.h"
-
-typedef struct topic_subscription* topic_subscription_pt;
-
-celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, char* scope, char* topic,topic_subscription_pt* out);
-celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts);
-celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts);
-celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts);
-
-celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL);
-celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL);
-
-celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
-celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
-
-celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription);
-celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt subscription);
-unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt subscription);
-
-#endif /*TOPIC_SUBSCRIPTION_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/large_udp.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/large_udp.c b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/large_udp.c
deleted file mode 100644
index e5cd5b5..0000000
--- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/large_udp.c
+++ /dev/null
@@ -1,362 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * large_udp.c
- *
- * \date Mar 1, 2016
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#include "large_udp.h"
-
-#include <stdio.h>
-#include <string.h>
-#include <unistd.h>
-#include <stdlib.h>
-#include <errno.h>
-#include <array_list.h>
-#include <pthread.h>
-
-#define MAX_UDP_MSG_SIZE 65535 /* 2^16 -1 */
-#define IP_HEADER_SIZE 20
-#define UDP_HEADER_SIZE 8
-//#define MTU_SIZE 1500
-#define MTU_SIZE 8000
-#define MAX_MSG_VECTOR_LEN 64
-
-//#define NO_IP_FRAGMENTATION
-
-struct largeUdp {
- unsigned int maxNrLists;
- array_list_pt udpPartLists;
- pthread_mutex_t dbLock;
-};
-
-typedef struct udpPartList {
- unsigned int msg_ident;
- unsigned int msg_size;
- unsigned int nrPartsRemaining;
- char *data;
-} *udpPartList_pt;
-
-
-typedef struct msg_part_header {
- unsigned int msg_ident;
- unsigned int total_msg_size;
- unsigned int part_msg_size;
- unsigned int offset;
-} msg_part_header_t;
-
-#ifdef NO_IP_FRAGMENTATION
- #define MAX_PART_SIZE (MTU_SIZE - (IP_HEADER_SIZE + UDP_HEADER_SIZE + sizeof(struct msg_part_header) ))
-#else
- #define MAX_PART_SIZE (MAX_UDP_MSG_SIZE - (IP_HEADER_SIZE + UDP_HEADER_SIZE + sizeof(struct msg_part_header) ))
-#endif
-
-typedef struct msg_part {
- msg_part_header_t header;
- char data[MAX_PART_SIZE];
-} msg_part_t;
-
-//
-// Create a handle
-//
-largeUdp_pt largeUdp_create(unsigned int maxNrUdpReceptions)
-{
- printf("## Creating large UDP\n");
- largeUdp_pt handle = calloc(sizeof(*handle), 1);
- if(handle != NULL) {
- handle->maxNrLists = maxNrUdpReceptions;
- if(arrayList_create(&handle->udpPartLists) != CELIX_SUCCESS) {
- free(handle);
- handle = NULL;
- }
- pthread_mutex_init(&handle->dbLock, 0);
- }
-
- return handle;
-}
-
-//
-// Destroys the handle
-//
-void largeUdp_destroy(largeUdp_pt handle)
-{
- printf("### Destroying large UDP\n");
- if(handle != NULL) {
- pthread_mutex_lock(&handle->dbLock);
- int nrUdpLists = arrayList_size(handle->udpPartLists);
- int i;
- for(i=0; i < nrUdpLists; i++) {
- udpPartList_pt udpPartList = arrayList_remove(handle->udpPartLists, i);
- if(udpPartList) {
- if(udpPartList->data) {
- free(udpPartList->data);
- udpPartList->data = NULL;
- }
- free(udpPartList);
- }
- }
- arrayList_destroy(handle->udpPartLists);
- handle->udpPartLists = NULL;
- pthread_mutex_unlock(&handle->dbLock);
- pthread_mutex_destroy(&handle->dbLock);
- free(handle);
- }
-}
-
-//
-// Write large data to UDP. This function splits the data in chunks and sends these chunks with a header over UDP.
-//
-int largeUdp_sendmsg(largeUdp_pt handle, int fd, struct iovec *largeMsg_iovec, int len, int flags, struct sockaddr_in *dest_addr, size_t addrlen)
-{
- int n;
- int result = 0;
- msg_part_header_t header;
-
- int written = 0;
- header.msg_ident = rand();
- header.total_msg_size = 0;
- for(n = 0; n < len ;n++) {
- header.total_msg_size += largeMsg_iovec[n].iov_len;
- }
- int nr_buffers = (header.total_msg_size / MAX_PART_SIZE) + 1;
-
- struct iovec msg_iovec[MAX_MSG_VECTOR_LEN];
- struct msghdr msg;
- msg.msg_name = dest_addr;
- msg.msg_namelen = addrlen;
- msg.msg_flags = 0;
- msg.msg_iov = msg_iovec;
- msg.msg_iovlen = 2; // header and payload;
- msg.msg_control = NULL;
- msg.msg_controllen = 0;
-
- msg.msg_iov[0].iov_base = &header;
- msg.msg_iov[0].iov_len = sizeof(header);
-
- for(n = 0; n < nr_buffers; n++) {
-
- header.part_msg_size = (((header.total_msg_size - n * MAX_PART_SIZE) > MAX_PART_SIZE) ? MAX_PART_SIZE : (header.total_msg_size - n * MAX_PART_SIZE));
- header.offset = n * MAX_PART_SIZE;
- int remainingOffset = header.offset;
- int recvPart = 0;
- // find the start of the part
- while(remainingOffset > largeMsg_iovec[recvPart].iov_len) {
- remainingOffset -= largeMsg_iovec[recvPart].iov_len;
- recvPart++;
- }
- int remainingData = header.part_msg_size;
- int sendPart = 1;
- msg.msg_iovlen = 1;
-
- // fill in the output iovec from the input iovec in such a way that all UDP frames are filled maximal.
- while(remainingData > 0) {
- int partLen = ( (largeMsg_iovec[recvPart].iov_len - remainingOffset) <= remainingData ? (largeMsg_iovec[recvPart].iov_len -remainingOffset) : remainingData);
- msg.msg_iov[sendPart].iov_base = largeMsg_iovec[recvPart].iov_base + remainingOffset;
- msg.msg_iov[sendPart].iov_len = partLen;
- remainingData -= partLen;
- remainingOffset = 0;
- sendPart++;
- recvPart++;
- msg.msg_iovlen++;
- }
- int tmp, tmptot;
- for(tmp = 0, tmptot=0; tmp < msg.msg_iovlen; tmp++) {
- tmptot += msg.msg_iov[tmp].iov_len;
- }
-
- int w = sendmsg(fd, &msg, 0);
- if(w == -1) {
- perror("send()");
- result = -1;
- break;
- }
- written += w;
- }
-
- return (result == 0 ? written : result);
-}
-
-//
-// Write large data to UDP. This function splits the data in chunks and sends these chunks with a header over UDP.
-//
-int largeUdp_sendto(largeUdp_pt handle, int fd, void *buf, size_t count, int flags, struct sockaddr_in *dest_addr, size_t addrlen)
-{
- int n;
- int nr_buffers = (count / MAX_PART_SIZE) + 1;
- int result = 0;
- msg_part_header_t header;
-
- int written = 0;
- header.msg_ident = rand();
- header.total_msg_size = count;
- char *databuf = buf;
-
- struct iovec msg_iovec[2];
- struct msghdr msg;
- msg.msg_name = dest_addr;
- msg.msg_namelen = addrlen;
- msg.msg_flags = 0;
- msg.msg_iov = msg_iovec;
- msg.msg_iovlen = 2; // header and payload;
- msg.msg_control = NULL;
- msg.msg_controllen = 0;
-
- msg.msg_iov[0].iov_base = &header;
- msg.msg_iov[0].iov_len = sizeof(header);
-
- for(n = 0; n < nr_buffers; n++) {
-
- header.part_msg_size = (((header.total_msg_size - n * MAX_PART_SIZE) > MAX_PART_SIZE) ? MAX_PART_SIZE : (header.total_msg_size - n * MAX_PART_SIZE));
- header.offset = n * MAX_PART_SIZE;
- msg.msg_iov[1].iov_base = &databuf[header.offset];
- msg.msg_iov[1].iov_len = header.part_msg_size;
- int w = sendmsg(fd, &msg, 0);
- if(w == -1) {
- perror("send()");
- result = -1;
- break;
- }
- written += w;
- //usleep(1000); // TODO: If not slept a UDP buffer overflow occurs and parts are missing at the reception side (at least via localhost)
- }
-
- return (result == 0 ? written : result);
-}
-
-//
-// Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure
-// If the message is completely reassembled true is returned and the index and size have valid values
-//
-bool largeUdp_dataAvailable(largeUdp_pt handle, int fd, unsigned int *index, unsigned int *size) {
- msg_part_header_t header;
- int result = false;
- // Only read the header, we don't know yet where to store the payload
- if(recv(fd, &header, sizeof(header), MSG_PEEK) < 0) {
- perror("read()");
- return false;
- }
-
- struct iovec msg_vec[2];
- struct msghdr msg;
- msg.msg_name = NULL;
- msg.msg_namelen = 0;
- msg.msg_flags = 0;
- msg.msg_iov = msg_vec;
- msg.msg_iovlen = 2; // header and payload;
- msg.msg_control = NULL;
- msg.msg_controllen = 0;
-
- msg.msg_iov[0].iov_base = &header;
- msg.msg_iov[0].iov_len = sizeof(header);
-
- pthread_mutex_lock(&handle->dbLock);
-
- int nrUdpLists = arrayList_size(handle->udpPartLists);
- int i;
- bool found = false;
- for(i = 0; i < nrUdpLists; i++) {
- udpPartList_pt udpPartList = arrayList_get(handle->udpPartLists, i);
- if(udpPartList->msg_ident == header.msg_ident) {
- found = true;
-
- //sanity check
- if(udpPartList->msg_size != header.total_msg_size) {
- // Corruption occurred. Remove the existing administration and build up a new one.
- arrayList_remove(handle->udpPartLists, i);
- free(udpPartList->data);
- free(udpPartList);
- found = false;
- break;
- }
-
- msg.msg_iov[1].iov_base = &udpPartList->data[header.offset];
- msg.msg_iov[1].iov_len = header.part_msg_size;
- recvmsg(fd, &msg, 0);
-
- udpPartList->nrPartsRemaining--;
- if(udpPartList->nrPartsRemaining == 0) {
- *index = i;
- *size = udpPartList->msg_size;
- result = true;
- break;
- } else {
- result = false; // not complete
- break;
- }
- }
- }
-
- if(found == false) {
- udpPartList_pt udpPartList = NULL;
- if(nrUdpLists == handle->maxNrLists) {
- // remove list at index 0
- udpPartList = arrayList_remove(handle->udpPartLists, 0);
- fprintf(stderr, "ERROR: Removing entry for id %d: %d parts not received\n",udpPartList->msg_ident, udpPartList->nrPartsRemaining );
- free(udpPartList->data);
- free(udpPartList);
- nrUdpLists--;
- }
- udpPartList = calloc(sizeof(*udpPartList), 1);
- udpPartList->msg_ident = header.msg_ident;
- udpPartList->msg_size = header.total_msg_size;
- udpPartList->nrPartsRemaining = header.total_msg_size / MAX_PART_SIZE;
- udpPartList->data = calloc(sizeof(char), header.total_msg_size);
-
- msg.msg_iov[1].iov_base = &udpPartList->data[header.offset];
- msg.msg_iov[1].iov_len = header.part_msg_size;
- recvmsg(fd, &msg, 0);
-
- arrayList_add(handle->udpPartLists, udpPartList);
-
- if(udpPartList->nrPartsRemaining == 0) {
- *index = nrUdpLists;
- *size = udpPartList->msg_size;
- result = true;
- } else {
- result = false;
- }
-
- }
- pthread_mutex_unlock(&handle->dbLock);
-
- return result;
-}
-
-//
-// Read out the message which is indicated available by the largeUdp_dataAvailable function
-//
-int largeUdp_read(largeUdp_pt handle, unsigned int index, void ** buffer, unsigned int size)
-{
- int result = 0;
- pthread_mutex_lock(&handle->dbLock);
-
- udpPartList_pt udpPartList = arrayList_remove(handle->udpPartLists, index);
- if(udpPartList) {
- *buffer = udpPartList->data;
- free(udpPartList);
- } else {
- result = -1;
- }
- pthread_mutex_unlock(&handle->dbLock);
-
- return result;
-}
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c
deleted file mode 100644
index 24202dd..0000000
--- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * psa_activator.c
- *
- * \date Sep 30, 2011
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-
-#include "bundle_activator.h"
-#include "service_registration.h"
-
-#include "pubsub_admin_impl.h"
-
-struct activator {
- pubsub_admin_pt admin;
- pubsub_admin_service_pt adminService;
- service_registration_pt registration;
-};
-
-celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
- celix_status_t status = CELIX_SUCCESS;
- struct activator *activator;
-
- activator = calloc(1, sizeof(*activator));
- if (!activator) {
- status = CELIX_ENOMEM;
- }
- else{
- *userData = activator;
- status = pubsubAdmin_create(context, &(activator->admin));
- }
-
- return status;
-}
-
-celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
- celix_status_t status = CELIX_SUCCESS;
- struct activator *activator = userData;
- pubsub_admin_service_pt pubsubAdminSvc = calloc(1, sizeof(*pubsubAdminSvc));
-
- if (!pubsubAdminSvc) {
- status = CELIX_ENOMEM;
- }
- else{
- pubsubAdminSvc->admin = activator->admin;
-
- pubsubAdminSvc->addPublication = pubsubAdmin_addPublication;
- pubsubAdminSvc->removePublication = pubsubAdmin_removePublication;
-
- pubsubAdminSvc->addSubscription = pubsubAdmin_addSubscription;
- pubsubAdminSvc->removeSubscription = pubsubAdmin_removeSubscription;
-
- pubsubAdminSvc->closeAllPublications = pubsubAdmin_closeAllPublications;
- pubsubAdminSvc->closeAllSubscriptions = pubsubAdmin_closeAllSubscriptions;
-
- pubsubAdminSvc->matchPublisher = pubsubAdmin_matchPublisher;
- pubsubAdminSvc->matchSubscriber = pubsubAdmin_matchSubscriber;
-
- activator->adminService = pubsubAdminSvc;
-
- status = bundleContext_registerService(context, PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL, &activator->registration);
-
- }
-
-
- return status;
-}
-
-celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
- celix_status_t status = CELIX_SUCCESS;
- struct activator *activator = userData;
-
- serviceRegistration_unregister(activator->registration);
- activator->registration = NULL;
-
- pubsubAdmin_stop(activator->admin);
-
- free(activator->adminService);
- activator->adminService = NULL;
-
- return status;
-}
-
-celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) {
- celix_status_t status = CELIX_SUCCESS;
- struct activator *activator = userData;
-
- pubsubAdmin_destroy(activator->admin);
- activator->admin = NULL;
-
- free(activator);
-
- return status;
-}
-
-