You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/01/30 19:30:15 UTC
[31/54] [abbrv] celix git commit: Merge commit
'ee29b00d7a80af43d351e61916d5a5aa90f97e46' into
feature/CELIX-417-cmake-refactor
http://git-wip-us.apache.org/repos/asf/celix/blob/ac0d0d77/framework/src/celix_launcher.c
----------------------------------------------------------------------
diff --cc framework/src/celix_launcher.c
index ba83f25,0000000..fe5d0c0
mode 100644,000000..100644
--- a/framework/src/celix_launcher.c
+++ b/framework/src/celix_launcher.c
@@@ -1,242 -1,0 +1,315 @@@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * celix_launcher.c
+ *
+ * \date Mar 23, 2010
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#include "celix_launcher.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <libgen.h>
+#include <signal.h>
+
+#ifndef CELIX_NO_CURLINIT
+#include <curl/curl.h>
+#endif
+
+#include <string.h>
+#include <curl/curl.h>
+#include <signal.h>
+#include <libgen.h>
+#include "celix_launcher.h"
+#include "framework.h"
+#include "linked_list_iterator.h"
+
+static void show_usage(char* prog_name);
+static void shutdown_framework(int signal);
+static void ignore(int signal);
+
++static int celixLauncher_launchWithConfigAndProps(const char *configFile, framework_pt *framework, properties_pt packedConfig);
++static int celixLauncher_launchWithStreamAndProps(FILE *stream, framework_pt *framework, properties_pt packedConfig);
++
+#define DEFAULT_CONFIG_FILE "config.properties"
+
+static framework_pt framework = NULL;
+
++/**
++ * Method kept because of usage in examples & unit tests
++ */
+int celixLauncher_launchWithArgs(int argc, char *argv[]) {
++ return celixLauncher_launchWithArgsAndProps(argc, argv, NULL);
++}
++
++int celixLauncher_launchWithArgsAndProps(int argc, char *argv[], properties_pt packedConfig) {
+ // Perform some minimal command-line option parsing...
+ char *opt = NULL;
+ if (argc > 1) {
+ opt = argv[1];
+ }
+
+ char *config_file = NULL;
+
+ if (opt) {
+ // Check whether the user wants some help...
+ if (strcmp("-h", opt) == 0 || strcmp("-help", opt) == 0) {
+ show_usage(argv[0]);
+ return 0;
+ } else {
+ config_file = opt;
+ }
+ } else {
+ config_file = DEFAULT_CONFIG_FILE;
+ }
+
+ struct sigaction sigact;
+ memset(&sigact, 0, sizeof(sigact));
+ sigact.sa_handler = shutdown_framework;
+ sigaction(SIGINT, &sigact, NULL);
+ sigaction(SIGTERM, &sigact, NULL);
+
+ memset(&sigact, 0, sizeof(sigact));
+ sigact.sa_handler = ignore;
+ sigaction(SIGUSR1, &sigact, NULL);
+ sigaction(SIGUSR2, &sigact, NULL);
+
- int rc = celixLauncher_launch(config_file, &framework);
++ int rc = celixLauncher_launchWithConfigAndProps(config_file, &framework, packedConfig);
+ if (rc == 0) {
+ celixLauncher_waitForShutdown(framework);
+ celixLauncher_destroy(framework);
+ }
+ return rc;
+}
+
+static void show_usage(char* prog_name) {
+ printf("Usage:\n %s [path/to/config.properties]\n\n", basename(prog_name));
+}
+
+static void shutdown_framework(int signal) {
+ if (framework != NULL) {
+ celixLauncher_stop(framework); //NOTE main thread will destroy
+ }
+}
+
+static void ignore(int signal) {
+ //ignoring for signal SIGUSR1, SIGUSR2. Can be used on threads
+}
+
+int celixLauncher_launch(const char *configFile, framework_pt *framework) {
++ return celixLauncher_launchWithConfigAndProps(configFile, framework, NULL);
++}
++
++static int celixLauncher_launchWithConfigAndProps(const char *configFile, framework_pt *framework, properties_pt packedConfig){
+ int status = 0;
+ FILE *config = fopen(configFile, "r");
- if (config != NULL) {
++
++ if (config != NULL && packedConfig != NULL) {
++ status = celixLauncher_launchWithStreamAndProps(config, framework, packedConfig);
++ } else if (config != NULL) {
+ status = celixLauncher_launchWithStream(config, framework);
++ } else if (packedConfig != NULL) {
++ status = celixLauncher_launchWithProperties(packedConfig, framework);
+ } else {
+ fprintf(stderr, "Error: invalid or non-existing configuration file: '%s'.", configFile);
+ perror("");
+ status = 1;
+ }
++
+ return status;
+}
+
+int celixLauncher_launchWithStream(FILE *stream, framework_pt *framework) {
+ int status = 0;
+
+ properties_pt config = properties_loadWithStream(stream);
+ fclose(stream);
+ // Make sure we've read it and that nothing went wrong with the file access...
+ if (config == NULL) {
+ fprintf(stderr, "Error: invalid configuration file");
+ perror(NULL);
+ status = 1;
+ }
+ else {
+ status = celixLauncher_launchWithProperties(config, framework);
+ }
+
+ return status;
+}
+
++static int celixLauncher_launchWithStreamAndProps(FILE *stream, framework_pt *framework, properties_pt packedConfig){
++ int status = 0;
++
++ properties_pt runtimeConfig = properties_loadWithStream(stream);
++ fclose(stream);
++
++ // Make sure we've read it and that nothing went wrong with the file access...
++ // If there is no runtimeConfig, the packedConfig can be stored as global config
++ if (runtimeConfig == NULL){
++ runtimeConfig = packedConfig;
++ }
++
++ if (runtimeConfig == NULL) {
++ fprintf(stderr, "Error: invalid configuration file");
++ perror(NULL);
++ status = 1;
++ } else {
++ // Check if there's a pre-compiled config available
++ if (packedConfig != NULL){
++ // runtimeConfig and packedConfig must be merged
++ // when a duplicate of a key is available, the runtimeConfig must be prioritized
++
++ hash_map_iterator_t iter = hashMapIterator_construct(packedConfig);
++
++ hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
++
++ while (entry != NULL) {
++ const char * key = (const char *) hashMapEntry_getKey(entry);
++ const char * value = (const char *) hashMapEntry_getValue(entry);
++
++ // Check existence of key in runtimeConfig
++ if (!hashMap_containsKey(runtimeConfig, key)) {
++ properties_set(runtimeConfig, key, value);
++ }
++
++ entry = hashMapIterator_nextEntry(&iter);
++ if (entry != NULL) {
++ key = (const char *) hashMapEntry_getKey(entry);
++ value = (const char *) hashMapEntry_getValue(entry);
++ }
++ }
++
++ // normally, the framework_destroy will clean up the properties_pt
++ // since there are 2 properties_pt available (runtimeConfig and packedConfig)
++ // the packedConfig must be destroyed
++ properties_destroy(packedConfig);
++ }
++
++ status = celixLauncher_launchWithProperties(runtimeConfig, framework);
++ }
++
++ return status;
++}
+
+int celixLauncher_launchWithProperties(properties_pt config, framework_pt *framework) {
+ celix_status_t status;
+#ifndef CELIX_NO_CURLINIT
+ // Before doing anything else, let's setup Curl
+ curl_global_init(CURL_GLOBAL_NOTHING);
+#endif
+
+ const char* autoStartProp = properties_get(config, "cosgi.auto.start.1");
+ char* autoStart = NULL;
+ if (autoStartProp != NULL) {
+ autoStart = strndup(autoStartProp, 1024*10);
+ }
+
+ status = framework_create(framework, config);
+ bundle_pt fwBundle = NULL;
+ if (status == CELIX_SUCCESS) {
+ status = fw_init(*framework);
+ if (status == CELIX_SUCCESS) {
+ // Start the system bundle
+ status = framework_getFrameworkBundle(*framework, &fwBundle);
+
+ if(status == CELIX_SUCCESS){
+ bundle_start(fwBundle);
+
+ char delims[] = " ";
+ char *result = NULL;
+ char *save_ptr = NULL;
+ linked_list_pt bundles;
+ array_list_pt installed = NULL;
+ bundle_context_pt context = NULL;
+ linked_list_iterator_pt iter = NULL;
+ unsigned int i;
+
+ linkedList_create(&bundles);
+ result = strtok_r(autoStart, delims, &save_ptr);
+ while (result != NULL) {
+ char *location = strdup(result);
+ linkedList_addElement(bundles, location);
+ result = strtok_r(NULL, delims, &save_ptr);
+ }
+ // First install all bundles
+ // Afterwards start them
+ arrayList_create(&installed);
+ bundle_getContext(fwBundle, &context);
+ iter = linkedListIterator_create(bundles, 0);
+ while (linkedListIterator_hasNext(iter)) {
+ bundle_pt current = NULL;
+ char *location = (char *) linkedListIterator_next(iter);
+ if (bundleContext_installBundle(context, location, ¤t) == CELIX_SUCCESS) {
+ // Only add bundle if it is installed correctly
+ arrayList_add(installed, current);
+ } else {
+ printf("Could not install bundle from %s\n", location);
+ }
+ linkedListIterator_remove(iter);
+ free(location);
+ }
+ linkedListIterator_destroy(iter);
+ linkedList_destroy(bundles);
+
+ for (i = 0; i < arrayList_size(installed); i++) {
+ bundle_pt installedBundle = (bundle_pt) arrayList_get(installed, i);
+ bundle_startWithOptions(installedBundle, 0);
+ }
+
+ arrayList_destroy(installed);
+ }
+ }
+ }
+
+ if (status != CELIX_SUCCESS) {
+ printf("Problem creating framework\n");
+ }
+
+ printf("Launcher: Framework Started\n");
+
+ free(autoStart);
+
+ return status;
+}
+
+void celixLauncher_waitForShutdown(framework_pt framework) {
+ framework_waitForStop(framework);
+}
+
+void celixLauncher_destroy(framework_pt framework) {
+ framework_destroy(framework);
+
+#ifndef CELIX_NO_CURLINIT
+ // Cleanup Curl
+ curl_global_cleanup();
+#endif
+
+ printf("Launcher: Exit\n");
+}
+
+void celixLauncher_stop(framework_pt framework) {
+ bundle_pt fwBundle = NULL;
+ if( framework_getFrameworkBundle(framework, &fwBundle) == CELIX_SUCCESS){
+ bundle_stop(fwBundle);
+ }
+}
http://git-wip-us.apache.org/repos/asf/celix/blob/ac0d0d77/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
----------------------------------------------------------------------
diff --cc pubsub/pubsub_admin_udp_mc/src/topic_publication.c
index e43ec29,0000000..44106df
mode 100644,000000..100644
--- a/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
+++ b/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
@@@ -1,444 -1,0 +1,437 @@@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
- * htPSA_UDP_MC_TP://www.apache.org/licenses/LICENSE-2.0
++ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
- /*
- * topic_publication.c
- *
- * \date Sep 24, 2015
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
+
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include "array_list.h"
+#include "celixbool.h"
+#include "service_registration.h"
+#include "utils.h"
+#include "service_factory.h"
+#include "version.h"
+
+#include "topic_publication.h"
+#include "pubsub_common.h"
+#include "publisher.h"
+#include "large_udp.h"
+
+#include "pubsub_serializer.h"
+
+#define EP_ADDRESS_LEN 32
+
+#define FIRST_SEND_DELAY 2
+
+struct topic_publication {
+ int sendSocket;
+ char* endpoint;
+ service_registration_pt svcFactoryReg;
+ array_list_pt pub_ep_list; //List<pubsub_endpoint>
+ hash_map_pt boundServices; //<bundle_pt,bound_service>
+ celix_thread_mutex_t tp_lock;
+ pubsub_serializer_service_t *serializer;
+ struct sockaddr_in destAddr;
+};
+
+typedef struct publish_bundle_bound_service {
+ topic_publication_pt parent;
+ pubsub_publisher_t service;
+ bundle_pt bundle;
+ char *scope;
+ char *topic;
+ hash_map_pt msgTypes;
+ unsigned short getCount;
+ celix_thread_mutex_t mp_lock;
+ largeUdp_pt largeUdpHandle;
+}* publish_bundle_bound_service_pt;
+
+
+typedef struct pubsub_msg{
+ pubsub_msg_header_pt header;
+ char* payload;
+ unsigned int payloadSize;
+} pubsub_msg_t;
+
+
+static unsigned int rand_range(unsigned int min, unsigned int max);
+
+static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
+static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
+
+static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle);
+static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc);
+
+static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, const void *msg);
+
+static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId);
+
+
+static void delay_first_send_for_late_joiners(void);
+
+
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* bindIP, topic_publication_pt *out){
+
+ char* ep = malloc(EP_ADDRESS_LEN);
+ memset(ep,0,EP_ADDRESS_LEN);
+ unsigned int port = pubEP->serviceID + rand_range(UDP_BASE_PORT+pubEP->serviceID+3, UDP_MAX_PORT);
+ snprintf(ep,EP_ADDRESS_LEN,"udp://%s:%u",bindIP,port);
+
+
+ topic_publication_pt pub = calloc(1,sizeof(*pub));
+
+ arrayList_create(&(pub->pub_ep_list));
+ pub->boundServices = hashMap_create(NULL,NULL,NULL,NULL);
+ celixThreadMutex_create(&(pub->tp_lock),NULL);
+
+ pub->endpoint = ep;
+ pub->sendSocket = sendSocket;
+ pub->destAddr.sin_family = AF_INET;
+ pub->destAddr.sin_addr.s_addr = inet_addr(bindIP);
+ pub->destAddr.sin_port = htons(port);
+
+ pub->serializer = best_serializer;
+
+ pubsub_topicPublicationAddPublisherEP(pub,pubEP);
+
+ *out = pub;
+
+ return CELIX_SUCCESS;
+}
+
+celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
+ celix_status_t status = CELIX_SUCCESS;
+
+ celixThreadMutex_lock(&(pub->tp_lock));
+
+ free(pub->endpoint);
+ arrayList_destroy(pub->pub_ep_list);
+
+ hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
+ while(hashMapIterator_hasNext(iter)){
+ publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter);
+ pubsub_destroyPublishBundleBoundService(bound);
+ }
+ hashMapIterator_destroy(iter);
+ hashMap_destroy(pub->boundServices,false,false);
+
+ pub->svcFactoryReg = NULL;
+ pub->serializer = NULL;
+
+ if(close(pub->sendSocket) != 0){
+ status = CELIX_FILE_IO_EXCEPTION;
+ }
+
+ celixThreadMutex_unlock(&(pub->tp_lock));
+
+ celixThreadMutex_destroy(&(pub->tp_lock));
+
+ free(pub);
+
+ return status;
+}
+
+celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory){
+ celix_status_t status = CELIX_SUCCESS;
+
+ /* Let's register the new service */
+
+ pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0);
+
+ if(pubEP!=NULL){
+ service_factory_pt factory = calloc(1, sizeof(*factory));
+ factory->handle = pub;
+ factory->getService = pubsub_topicPublicationGetService;
+ factory->ungetService = pubsub_topicPublicationUngetService;
+
+ properties_pt props = properties_create();
+ properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope);
+ properties_set(props,PUBSUB_PUBLISHER_TOPIC,pubEP->topic);
+
+ status = bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg));
+
+ if(status != CELIX_SUCCESS){
+ properties_destroy(props);
+ printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot register ServiceFactory for topic %s, topic %s (bundle %ld).\n",pubEP->scope, pubEP->topic,pubEP->serviceID);
+ }
+ else{
+ *svcFactory = factory;
+ }
+ }
+ else{
+ printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot find pubsub_endpoint after adding it...Should never happen!\n");
+ status = CELIX_SERVICE_EXCEPTION;
+ }
+
+ return status;
+}
+
+celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
+ return serviceRegistration_unregister(pub->svcFactoryReg);
+}
+
+celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){
+
+ celixThreadMutex_lock(&(pub->tp_lock));
+ ep->endpoint = strdup(pub->endpoint);
+ arrayList_add(pub->pub_ep_list,ep);
+ celixThreadMutex_unlock(&(pub->tp_lock));
+
+ return CELIX_SUCCESS;
+}
+
+celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){
+
+ celixThreadMutex_lock(&(pub->tp_lock));
+ arrayList_removeElement(pub->pub_ep_list,ep);
+ celixThreadMutex_unlock(&(pub->tp_lock));
+
+ return CELIX_SUCCESS;
+}
+
+array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub){
+ array_list_pt list = NULL;
+ celixThreadMutex_lock(&(pub->tp_lock));
+ list = arrayList_clone(pub->pub_ep_list);
+ celixThreadMutex_unlock(&(pub->tp_lock));
+ return list;
+}
+
+
+static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ topic_publication_pt publish = (topic_publication_pt)handle;
+
+ celixThreadMutex_lock(&(publish->tp_lock));
+
+ publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
+ if(bound==NULL){
+ bound = pubsub_createPublishBundleBoundService(publish,bundle);
+ if(bound!=NULL){
+ hashMap_put(publish->boundServices,bundle,bound);
+ }
+ }
+ else{
+ bound->getCount++;
+ }
+
+ if (bound != NULL) {
+ *service = &bound->service;
+ }
+
+ celixThreadMutex_unlock(&(publish->tp_lock));
+
+ return status;
+}
+
+static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service) {
+
+ topic_publication_pt publish = (topic_publication_pt)handle;
+
+ celixThreadMutex_lock(&(publish->tp_lock));
+
+ publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
+ if(bound!=NULL){
+
+ bound->getCount--;
+ if(bound->getCount==0){
+ pubsub_destroyPublishBundleBoundService(bound);
+ hashMap_remove(publish->boundServices,bundle);
+ }
+
+ }
+ else{
+ long bundleId = -1;
+ bundle_getBundleId(bundle,&bundleId);
+ printf("PSA_UDP_MC_TP: Unexpected ungetService call for bundle %ld.\n", bundleId);
+ }
+
+ /* service should be never used for unget, so let's set the pointer to NULL */
+ *service = NULL;
+
+ celixThreadMutex_unlock(&(publish->tp_lock));
+
+ return CELIX_SUCCESS;
+}
+
+static bool send_pubsub_msg(publish_bundle_bound_service_pt bound, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback){
+ const int iovec_len = 3; // header + size + payload
+ bool ret = true;
+
+ struct iovec msg_iovec[iovec_len];
+ msg_iovec[0].iov_base = msg->header;
+ msg_iovec[0].iov_len = sizeof(*msg->header);
+ msg_iovec[1].iov_base = &msg->payloadSize;
+ msg_iovec[1].iov_len = sizeof(msg->payloadSize);
+ msg_iovec[2].iov_base = msg->payload;
+ msg_iovec[2].iov_len = msg->payloadSize;
+
+ delay_first_send_for_late_joiners();
+
+ if(largeUdp_sendmsg(bound->largeUdpHandle, bound->parent->sendSocket, msg_iovec, iovec_len, 0, &bound->parent->destAddr, sizeof(bound->parent->destAddr)) == -1) {
+ perror("send_pubsub_msg:sendSocket");
+ ret = false;
+ }
+
+ if(releaseCallback) {
+ releaseCallback->release(msg->payload, bound);
+ }
+ return ret;
+
+}
+
+
+static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg) {
+ int status = 0;
+ publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle;
+
+ celixThreadMutex_lock(&(bound->parent->tp_lock));
+ celixThreadMutex_lock(&(bound->mp_lock));
+
- pubsub_msg_serializer_t* msgSer = (pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes, (void*)(uintptr_t)msgTypeId);
++ pubsub_msg_serializer_t* msgSer = (pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes, (void*)(intptr_t)msgTypeId);
+
+ if (msgSer != NULL) {
+ int major=0, minor=0;
+
+ pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
+ strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
+ msg_hdr->type = msgTypeId;
+
+
+ if (msgSer->msgVersion != NULL){
+ version_getMajor(msgSer->msgVersion, &major);
+ version_getMinor(msgSer->msgVersion, &minor);
+ msg_hdr->major = major;
+ msg_hdr->minor = minor;
+ }
+
+ void* serializedOutput = NULL;
+ size_t serializedOutputLen = 0;
+ msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen);
+
+ pubsub_msg_t *msg = calloc(1,sizeof(pubsub_msg_t));
+ msg->header = msg_hdr;
+ msg->payload = (char*)serializedOutput;
+ msg->payloadSize = serializedOutputLen;
+
+
+ if(send_pubsub_msg(bound, msg,true, NULL) == false) {
+ status = -1;
+ }
+ free(msg_hdr);
+ free(msg);
+ free(serializedOutput);
+
+
+ } else {
+ printf("PSA_UDP_MC_TP: No msg serializer available for msg type id %d\n", msgTypeId);
+ status=-1;
+ }
+
+ celixThreadMutex_unlock(&(bound->mp_lock));
+ celixThreadMutex_unlock(&(bound->parent->tp_lock));
+
+ return status;
+}
+
+static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){
+ *msgTypeId = utils_stringHash(msgType);
+ return 0;
+}
+
+
+static unsigned int rand_range(unsigned int min, unsigned int max){
+
+ double scaled = (double)(((double)random())/((double)RAND_MAX));
+ return (max-min+1)*scaled + min;
+
+}
+
+static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){
+
+ publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
+
+ if (bound != NULL) {
+
+ bound->parent = tp;
+ bound->bundle = bundle;
+ bound->getCount = 1;
+ celixThreadMutex_create(&bound->mp_lock,NULL);
+
+ if(tp->serializer != NULL){
+ tp->serializer->createSerializerMap(tp->serializer->handle,bundle,&bound->msgTypes);
+ }
+
+ pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
+ bound->scope=strdup(pubEP->scope);
+ bound->topic=strdup(pubEP->topic);
+ bound->largeUdpHandle = largeUdp_create(1);
+
+ bound->service.handle = bound;
+ bound->service.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID;
+ bound->service.send = pubsub_topicPublicationSend;
+ bound->service.sendMultipart = NULL; //Multipart not supported for UDP
+
+ }
+
+ return bound;
+}
+
+static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){
+
+ celixThreadMutex_lock(&boundSvc->mp_lock);
+
+ if(boundSvc->parent->serializer != NULL && boundSvc->msgTypes != NULL){
+ boundSvc->parent->serializer->destroySerializerMap(boundSvc->parent->serializer->handle, boundSvc->msgTypes);
+ }
+
+ if(boundSvc->scope!=NULL){
+ free(boundSvc->scope);
+ }
+
+ if(boundSvc->topic!=NULL){
+ free(boundSvc->topic);
+ }
+
+ largeUdp_destroy(boundSvc->largeUdpHandle);
+
+ celixThreadMutex_unlock(&boundSvc->mp_lock);
+ celixThreadMutex_destroy(&boundSvc->mp_lock);
+
+ free(boundSvc);
+
+}
+
+static void delay_first_send_for_late_joiners(){
+
+ static bool firstSend = true;
+
+ if(firstSend){
+ printf("PSA_UDP_MC_TP: Delaying first send for late joiners...\n");
+ sleep(FIRST_SEND_DELAY);
+ firstSend = false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/celix/blob/ac0d0d77/shell/CMakeLists.txt
----------------------------------------------------------------------
diff --cc shell/CMakeLists.txt
index ae8cf3f,11a16c1..b8aaac3
--- a/shell/CMakeLists.txt
+++ b/shell/CMakeLists.txt
@@@ -18,35 -18,38 +18,35 @@@ celix_subproject(SHELL "Option to enabl
if (SHELL)
find_package(CURL REQUIRED)
+ add_library(shell_api INTERFACE)
+ target_include_directories(shell_api INTERFACE include)
+
add_bundle(shell
SYMBOLIC_NAME "apache_celix_shell"
- VERSION "2.0.0"
+ VERSION "2.1.0"
NAME "Apache Celix Shell"
-
SOURCES
+ src/activator
+ src/shell
+ src/lb_command
+ src/start_command
+ src/stop_command
+ src/install_command
+ src/update_command
+ src/uninstall_command
+ src/log_command
+ src/inspect_command
+ src/help_command
+ )
+ target_include_directories(shell PRIVATE src ${CURL_INCLUDE_DIRS})
+ target_link_libraries(shell PRIVATE Celix::shell_api ${CURL_LIBRARIES} Celix::log_service_api Celix::log_helper)
- private/src/activator
- private/src/shell
- private/src/lb_command
- private/src/start_command
- private/src/stop_command
- private/src/install_command
- private/src/update_command
- private/src/uninstall_command
- private/src/log_command
- private/src/inspect_command
- private/src/help_command
-
- ${PROJECT_SOURCE_DIR}/log_service/public/src/log_helper.c
-
- )
-
- install_bundle(shell
+ install_bundle(shell
HEADERS
- public/include/shell.h public/include/command.h public/include/shell_constants.h
- )
+ include/shell.h include/command.h include/shell_constants.h
+ )
- include_directories("public/include")
- include_directories("private/include")
- include_directories("${PROJECT_SOURCE_DIR}/utils/public/include")
- include_directories("${PROJECT_SOURCE_DIR}/log_service/public/include")
- include_directories(${CURL_INCLUDE_DIRS})
- target_link_libraries(shell celix_framework ${CURL_LIBRARIES})
+ #Setup target aliases to match external usage
+ add_library(Celix::shell_api ALIAS shell_api)
+ add_library(Celix::shell ALIAS shell)
endif (SHELL)
http://git-wip-us.apache.org/repos/asf/celix/blob/ac0d0d77/utils/include/properties.h
----------------------------------------------------------------------
diff --cc utils/include/properties.h
index cf93ca0,0000000..5c6dc4d
mode 100644,000000..100644
--- a/utils/include/properties.h
+++ b/utils/include/properties.h
@@@ -1,66 -1,0 +1,68 @@@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * properties.h
+ *
+ * \date Apr 27, 2010
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#ifndef PROPERTIES_H_
+#define PROPERTIES_H_
+
+#include <stdio.h>
+
+#include "hash_map.h"
+#include "exports.h"
+#include "celix_errno.h"
+#ifdef __cplusplus
+extern "C" {
+#endif
+typedef hash_map_pt properties_pt;
+typedef hash_map_t properties_t;
+
+UTILS_EXPORT properties_pt properties_create(void);
+
+UTILS_EXPORT void properties_destroy(properties_pt properties);
+
+UTILS_EXPORT properties_pt properties_load(const char *filename);
+
+UTILS_EXPORT properties_pt properties_loadWithStream(FILE *stream);
+
++UTILS_EXPORT properties_pt properties_loadFromString(const char *input);
++
+UTILS_EXPORT void properties_store(properties_pt properties, const char *file, const char *header);
+
+UTILS_EXPORT const char *properties_get(properties_pt properties, const char *key);
+
+UTILS_EXPORT const char *properties_getWithDefault(properties_pt properties, const char *key, const char *defaultValue);
+
+UTILS_EXPORT void properties_set(properties_pt properties, const char *key, const char *value);
+
+UTILS_EXPORT celix_status_t properties_copy(properties_pt properties, properties_pt *copy);
+
+#define PROPERTIES_FOR_EACH(props, key) \
+ for(hash_map_iterator_t iter = hashMapIterator_construct(props); \
+ hashMapIterator_hasNext(&iter), (key) = (const char*)hashMapIterator_nextKey(&iter);)
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* PROPERTIES_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/ac0d0d77/utils/src/properties.c
----------------------------------------------------------------------
diff --cc utils/src/properties.c
index 0bd6dc3,0000000..1e097a0
mode 100644,000000..100644
--- a/utils/src/properties.c
+++ b/utils/src/properties.c
@@@ -1,302 -1,0 +1,330 @@@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * properties.c
+ *
+ * \date Apr 27, 2010
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <ctype.h>
+#include "celixbool.h"
+#include "properties.h"
+#include "utils.h"
+
+#define MALLOC_BLOCK_SIZE 5
+
+static void parseLine(const char* line, properties_pt props);
+
+properties_pt properties_create(void) {
+ return hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals, utils_stringEquals);
+}
+
+void properties_destroy(properties_pt properties) {
+ hash_map_iterator_pt iter = hashMapIterator_create(properties);
+ while (hashMapIterator_hasNext(iter)) {
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+ free(hashMapEntry_getKey(entry));
+ free(hashMapEntry_getValue(entry));
+ }
+ hashMapIterator_destroy(iter);
+ hashMap_destroy(properties, false, false);
+}
+
+properties_pt properties_load(const char* filename) {
+ FILE *file = fopen(filename, "r");
+ if(file==NULL){
+ return NULL;
+ }
+ properties_pt props = properties_loadWithStream(file);
+ fclose(file);
+ return props;
+}
+
+properties_pt properties_loadWithStream(FILE *file) {
+ properties_pt props = NULL;
+
+
+ if (file != NULL ) {
+ char *saveptr;
+ char *filebuffer = NULL;
+ char *line = NULL;
+ size_t file_size = 0;
+
+ props = properties_create();
+ fseek(file, 0, SEEK_END);
+ file_size = ftell(file);
+ fseek(file, 0, SEEK_SET);
+
+ if(file_size > 0){
+ filebuffer = calloc(file_size + 1, sizeof(char));
+ if(filebuffer) {
+ size_t rs = fread(filebuffer, sizeof(char), file_size, file);
+ if(rs != file_size){
+ fprintf(stderr,"fread read only %lu bytes out of %lu\n",rs,file_size);
+ }
+ filebuffer[file_size]='\0';
+ line = strtok_r(filebuffer, "\n", &saveptr);
+ while ( line != NULL ) {
+ parseLine(line, props);
+ line = strtok_r(NULL, "\n", &saveptr);
+ }
+ free(filebuffer);
+ }
+ }
+ }
+
+ return props;
+}
+
++properties_pt properties_loadFromString(const char *input){
++ properties_pt props = properties_create();
++
++ char *in = strdup(input);
++ char *line = NULL;
++ char *saveLinePointer = NULL;
++
++ bool firstTime = true;
++ do {
++ if (firstTime){
++ line = strtok_r(in, "\n", &saveLinePointer);
++ firstTime = false;
++ }else {
++ line = strtok_r(NULL, "\n", &saveLinePointer);
++ }
++
++ if (line == NULL){
++ break;
++ }
++
++ parseLine(line, props);
++ } while(line != NULL);
++
++ free(in);
++
++ return props;
++}
++
+
+/**
+ * Header is ignored for now, cannot handle comments yet
+ */
+void properties_store(properties_pt properties, const char* filename, const char* header) {
+ FILE *file = fopen ( filename, "w+" );
+ char *str;
+
+ if (file != NULL) {
+ if (hashMap_size(properties) > 0) {
+ hash_map_iterator_pt iterator = hashMapIterator_create(properties);
+ while (hashMapIterator_hasNext(iterator)) {
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(iterator);
+ str = hashMapEntry_getKey(entry);
+ for (int i = 0; i < strlen(str); i += 1) {
+ if (str[i] == '#' || str[i] == '!' || str[i] == '=' || str[i] == ':') {
+ fputc('\\', file);
+ }
+ fputc(str[i], file);
+ }
+
+ fputc('=', file);
+
+ str = hashMapEntry_getValue(entry);
+ for (int i = 0; i < strlen(str); i += 1) {
+ if (str[i] == '#' || str[i] == '!' || str[i] == '=' || str[i] == ':') {
+ fputc('\\', file);
+ }
+ fputc(str[i], file);
+ }
+
+ fputc('\n', file);
+
+ }
+ hashMapIterator_destroy(iterator);
+ }
+ fclose(file);
+ } else {
+ perror("File is null");
+ }
+}
+
+celix_status_t properties_copy(properties_pt properties, properties_pt *out) {
+ celix_status_t status = CELIX_SUCCESS;
+ properties_pt copy = properties_create();
+
+ if (copy != NULL) {
+ hash_map_iterator_pt iter = hashMapIterator_create(properties);
+ while (hashMapIterator_hasNext(iter)) {
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+ char *key = hashMapEntry_getKey(entry);
+ char *value = hashMapEntry_getValue(entry);
+ properties_set(copy, key, value);
+ }
+ hashMapIterator_destroy(iter);
+ } else {
+ status = CELIX_ENOMEM;
+ }
+
+ if (status == CELIX_SUCCESS) {
+ *out = copy;
+ }
+
+ return status;
+}
+
+const char* properties_get(properties_pt properties, const char* key) {
+ return hashMap_get(properties, (void*)key);
+}
+
+const char* properties_getWithDefault(properties_pt properties, const char* key, const char* defaultValue) {
+ const char* value = properties_get(properties, key);
+ return value == NULL ? defaultValue : value;
+}
+
+void properties_set(properties_pt properties, const char* key, const char* value) {
+ hash_map_entry_pt entry = hashMap_getEntry(properties, key);
+ char* oldValue = NULL;
+ if (entry != NULL) {
+ char* oldKey = hashMapEntry_getKey(entry);
+ oldValue = hashMapEntry_getValue(entry);
+ hashMap_put(properties, oldKey, strndup(value, 1024*10));
+ } else {
+ hashMap_put(properties, strndup(key, 1024*10), strndup(value, 1024*10));
+ }
+ free(oldValue);
+}
+
+static void updateBuffers(char **key, char ** value, char **output, int outputPos, int *key_len, int *value_len) {
+ if (*output == *key) {
+ if (outputPos == (*key_len) - 1) {
+ (*key_len) += MALLOC_BLOCK_SIZE;
+ *key = realloc(*key, *key_len);
+ *output = *key;
+ }
+ }
+ else {
+ if (outputPos == (*value_len) - 1) {
+ (*value_len) += MALLOC_BLOCK_SIZE;
+ *value = realloc(*value, *value_len);
+ *output = *value;
+ }
+ }
+}
+
+static void parseLine(const char* line, properties_pt props) {
+ int linePos = 0;
+ bool precedingCharIsBackslash = false;
+ bool isComment = false;
+ int outputPos = 0;
+ char *output = NULL;
+ int key_len = MALLOC_BLOCK_SIZE;
+ int value_len = MALLOC_BLOCK_SIZE;
+ linePos = 0;
+ precedingCharIsBackslash = false;
+ isComment = false;
+ output = NULL;
+ outputPos = 0;
+
+ //Ignore empty lines
+ if (line[0] == '\n' && line[1] == '\0') {
+ return;
+ }
+
+ char *key = calloc(1, key_len);
+ char *value = calloc(1, value_len);
+ key[0] = '\0';
+ value[0] = '\0';
+
+ while (line[linePos] != '\0') {
+ if (line[linePos] == ' ' || line[linePos] == '\t') {
+ if (output == NULL) {
+ //ignore
+ linePos += 1;
+ continue;
+ }
+ }
+ else {
+ if (output == NULL) {
+ output = key;
+ }
+ }
+ if (line[linePos] == '=' || line[linePos] == ':' || line[linePos] == '#' || line[linePos] == '!') {
+ if (precedingCharIsBackslash) {
+ //escaped special character
+ output[outputPos++] = line[linePos];
+ updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len);
+ precedingCharIsBackslash = false;
+ }
+ else {
+ if (line[linePos] == '#' || line[linePos] == '!') {
+ if (outputPos == 0) {
+ isComment = true;
+ break;
+ }
+ else {
+ output[outputPos++] = line[linePos];
+ updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len);
+ }
+ }
+ else { // = or :
+ if (output == value) { //already have a seperator
+ output[outputPos++] = line[linePos];
+ updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len);
+ }
+ else {
+ output[outputPos++] = '\0';
+ updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len);
+ output = value;
+ outputPos = 0;
+ }
+ }
+ }
+ }
+ else if (line[linePos] == '\\') {
+ if (precedingCharIsBackslash) { //double backslash -> backslash
+ output[outputPos++] = '\\';
+ updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len);
+ }
+ precedingCharIsBackslash = true;
+ }
+ else { //normal character
+ precedingCharIsBackslash = false;
+ output[outputPos++] = line[linePos];
+ updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len);
+ }
+ linePos += 1;
+ }
+ if (output != NULL) {
+ output[outputPos] = '\0';
+ }
+
+ if (!isComment) {
+ //printf("putting 'key'/'value' '%s'/'%s' in properties\n", utils_stringTrim(key), utils_stringTrim(value));
+ properties_set(props, utils_stringTrim(key), utils_stringTrim(value));
+ }
+ if(key) {
+ free(key);
+ }
+ if(value) {
+ free(value);
+ }
+
+}