You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by gr...@apache.org on 2017/03/03 17:33:18 UTC
celix git commit: Fixed some Coverity issues
Repository: celix
Updated Branches:
refs/heads/develop 50ed9f5dc -> 3b1491e52
Fixed some Coverity issues
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/3b1491e5
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/3b1491e5
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/3b1491e5
Branch: refs/heads/develop
Commit: 3b1491e5246f339c25c5397136836c6ed2d0698f
Parents: 50ed9f5
Author: gricciardi <gr...@apache.org>
Authored: Fri Mar 3 18:32:56 2017 +0100
Committer: gricciardi <gr...@apache.org>
Committed: Fri Mar 3 18:32:56 2017 +0100
----------------------------------------------------------------------
dfi/private/src/json_rpc.c | 2 +-
etcdlib/private/src/etcd.c | 726 +++++++++----------
.../phase2a/private/src/phase2a_cmp.c | 1 -
.../phase2b/private/src/phase2b_cmp.c | 1 -
framework/private/src/bundle_archive.c | 1 +
.../publisher/private/src/ps_pub_activator.c | 3 +-
.../pubsub_admin_udp_mc/private/src/large_udp.c | 526 +++++++-------
.../private/src/pubsub_admin_impl.c | 108 +--
.../private/src/topic_publication.c | 4 +-
.../private/src/topic_publication.c | 9 +-
pubsub/pubsub_common/public/src/log_helper.c | 2 +
.../pubsub_common/public/src/pubsub_endpoint.c | 6 +-
.../pubsub_discovery/private/src/etcd_watcher.c | 2 -
.../private/src/pubsub_topology_manager.c | 11 +-
.../private/src/endpoint_discovery_poller.c | 10 +-
.../discovery_etcd/private/src/etcd_watcher.c | 2 +
shell/private/src/uninstall_command.c | 7 +-
shell/private/src/update_command.c | 4 +-
18 files changed, 719 insertions(+), 706 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/3b1491e5/dfi/private/src/json_rpc.c
----------------------------------------------------------------------
diff --git a/dfi/private/src/json_rpc.c b/dfi/private/src/json_rpc.c
index 0abbc93..f2b0c56 100644
--- a/dfi/private/src/json_rpc.c
+++ b/dfi/private/src/json_rpc.c
@@ -137,7 +137,7 @@ int jsonRpc_call(dyn_interface_type *intf, void *service, const char *request, c
ffi_sarg returnVal = 1;
if (status == OK) {
- dynFunction_call(func, fp, (void *) &returnVal, args);
+ status = dynFunction_call(func, fp, (void *) &returnVal, args);
}
int funcCallStatus = (int)returnVal;
http://git-wip-us.apache.org/repos/asf/celix/blob/3b1491e5/etcdlib/private/src/etcd.c
----------------------------------------------------------------------
diff --git a/etcdlib/private/src/etcd.c b/etcdlib/private/src/etcd.c
index e08db58..97045ee 100644
--- a/etcdlib/private/src/etcd.c
+++ b/etcdlib/private/src/etcd.c
@@ -65,16 +65,16 @@ static size_t WriteMemoryCallback(void *contents, size_t size, size_t nmemb, voi
* etcd_init
*/
int etcd_init(const char* server, int port, int flags) {
- int status = 0;
- etcd_server = server;
- etcd_port = port;
+ int status = 0;
+ etcd_server = server;
+ etcd_port = port;
- if ((flags & ETCDLIB_NO_CURL_INITIALIZATION) == 0) {
- //NO_CURL_INITIALIZATION flag not set
- status = curl_global_init(CURL_GLOBAL_ALL);
- }
+ if ((flags & ETCDLIB_NO_CURL_INITIALIZATION) == 0) {
+ //NO_CURL_INITIALIZATION flag not set
+ status = curl_global_init(CURL_GLOBAL_ALL);
+ }
- return status;
+ return status;
}
@@ -82,212 +82,212 @@ int etcd_init(const char* server, int port, int flags) {
* etcd_get
*/
int etcd_get(const char* key, char** value, int* modifiedIndex) {
- json_t* js_root = NULL;
- json_t* js_node = NULL;
- json_t* js_value = NULL;
- json_t* js_modifiedIndex = NULL;
- json_error_t error;
- int res = -1;
- struct MemoryStruct reply;
-
- reply.memory = malloc(1); /* will be grown as needed by the realloc above */
- reply.size = 0; /* no data at this point */
-
- int retVal = -1;
- char *url;
- asprintf(&url, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key);
- res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
- free(url);
-
- if (res == CURLE_OK) {
- js_root = json_loads(reply.memory, 0, &error);
-
- if (js_root != NULL) {
- js_node = json_object_get(js_root, ETCD_JSON_NODE);
- }
- if (js_node != NULL) {
- js_value = json_object_get(js_node, ETCD_JSON_VALUE);
- js_modifiedIndex = json_object_get(js_node,
- ETCD_JSON_MODIFIEDINDEX);
-
- if (js_modifiedIndex != NULL && js_value != NULL) {
- if (modifiedIndex) {
- *modifiedIndex = json_integer_value(js_modifiedIndex);
- }
- *value = strdup(json_string_value(js_value));
- retVal = 0;
- }
- }
- if (js_root != NULL) {
- json_decref(js_root);
- }
- }
-
- if (reply.memory) {
- free(reply.memory);
- }
- if(retVal != 0) {
- value = NULL;
- }
- return retVal;
+ json_t* js_root = NULL;
+ json_t* js_node = NULL;
+ json_t* js_value = NULL;
+ json_t* js_modifiedIndex = NULL;
+ json_error_t error;
+ int res = -1;
+ struct MemoryStruct reply;
+
+ reply.memory = malloc(1); /* will be grown as needed by the realloc above */
+ reply.size = 0; /* no data at this point */
+
+ int retVal = -1;
+ char *url;
+ asprintf(&url, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key);
+ res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
+ free(url);
+
+ if (res == CURLE_OK) {
+ js_root = json_loads(reply.memory, 0, &error);
+
+ if (js_root != NULL) {
+ js_node = json_object_get(js_root, ETCD_JSON_NODE);
+ }
+ if (js_node != NULL) {
+ js_value = json_object_get(js_node, ETCD_JSON_VALUE);
+ js_modifiedIndex = json_object_get(js_node,
+ ETCD_JSON_MODIFIEDINDEX);
+
+ if (js_modifiedIndex != NULL && js_value != NULL) {
+ if (modifiedIndex) {
+ *modifiedIndex = json_integer_value(js_modifiedIndex);
+ }
+ *value = strdup(json_string_value(js_value));
+ retVal = 0;
+ }
+ }
+ if (js_root != NULL) {
+ json_decref(js_root);
+ }
+ }
+
+ if (reply.memory) {
+ free(reply.memory);
+ }
+ if(retVal != 0) {
+ *value = NULL;
+ }
+ return retVal;
}
static int etcd_get_recursive_values(json_t* js_root, etcd_key_value_callback callback, void *arg, json_int_t *mod_index) {
- json_t *js_nodes;
- if ((js_nodes = json_object_get(js_root, ETCD_JSON_NODES)) != NULL) {
- // subarray
- if (json_is_array(js_nodes)) {
- int len = json_array_size(js_nodes);
- for (int i = 0; i < len; i++) {
- json_t *js_object = json_array_get(js_nodes, i);
- json_t *js_mod_index = json_object_get(js_object, ETCD_JSON_MODIFIEDINDEX);
-
- if(js_mod_index != NULL) {
- json_int_t index = json_integer_value(js_mod_index);
- if(*mod_index < index) {
- *mod_index = index;
- }
- } else {
- printf("[ETCDLIB] Error: No INDEX found for key!\n");
- }
-
- if (json_object_get(js_object, ETCD_JSON_NODES)) {
- // node contains nodes
- etcd_get_recursive_values(js_object, callback, arg, mod_index);
- } else {
- json_t* js_key = json_object_get(js_object, ETCD_JSON_KEY);
- json_t* js_value = json_object_get(js_object, ETCD_JSON_VALUE);
-
- if (js_key && js_value) {
- if (!json_object_get(js_object, ETCD_JSON_DIR)) {
- callback(json_string_value(js_key), json_string_value(js_value), arg);
- }
- } //else empty etcd directory, not an error.
-
- }
- }
- } else {
- fprintf(stderr, "[ETCDLIB] Error: misformatted JSON: nodes element is not an array !!\n");
- }
- } else {
- fprintf(stderr, "[ETCDLIB] Error: nodes element not found!!\n");
- }
-
- return (*index > 0 ? 0 : 1);
+ json_t *js_nodes;
+ if ((js_nodes = json_object_get(js_root, ETCD_JSON_NODES)) != NULL) {
+ // subarray
+ if (json_is_array(js_nodes)) {
+ int len = json_array_size(js_nodes);
+ for (int i = 0; i < len; i++) {
+ json_t *js_object = json_array_get(js_nodes, i);
+ json_t *js_mod_index = json_object_get(js_object, ETCD_JSON_MODIFIEDINDEX);
+
+ if(js_mod_index != NULL) {
+ json_int_t index = json_integer_value(js_mod_index);
+ if(*mod_index < index) {
+ *mod_index = index;
+ }
+ } else {
+ printf("[ETCDLIB] Error: No INDEX found for key!\n");
+ }
+
+ if (json_object_get(js_object, ETCD_JSON_NODES)) {
+ // node contains nodes
+ etcd_get_recursive_values(js_object, callback, arg, mod_index);
+ } else {
+ json_t* js_key = json_object_get(js_object, ETCD_JSON_KEY);
+ json_t* js_value = json_object_get(js_object, ETCD_JSON_VALUE);
+
+ if (js_key && js_value) {
+ if (!json_object_get(js_object, ETCD_JSON_DIR)) {
+ callback(json_string_value(js_key), json_string_value(js_value), arg);
+ }
+ } //else empty etcd directory, not an error.
+
+ }
+ }
+ } else {
+ fprintf(stderr, "[ETCDLIB] Error: misformatted JSON: nodes element is not an array !!\n");
+ }
+ } else {
+ fprintf(stderr, "[ETCDLIB] Error: nodes element not found!!\n");
+ }
+
+ return (*mod_index > 0 ? 0 : 1);
}
/**
* etcd_get_directory
*/
int etcd_get_directory(const char* directory, etcd_key_value_callback callback, void* arg, long long* modifiedIndex) {
- json_t* js_root = NULL;
- json_t* js_rootnode = NULL;
-
- json_error_t error;
- int res;
- struct MemoryStruct reply;
-
- reply.memory = malloc(1); /* will be grown as needed by the realloc above */
- reply.size = 0; /* no data at this point */
-
- int retVal = 0;
- char *url;
-
- asprintf(&url, "http://%s:%d/v2/keys/%s?recursive=true", etcd_server, etcd_port, directory);
-
- res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
- free(url);
-
- if (res == CURLE_OK) {
- js_root = json_loads(reply.memory, 0, &error);
- if (js_root != NULL) {
- js_rootnode = json_object_get(js_root, ETCD_JSON_NODE);
- } else {
- retVal = -1;
- fprintf(stderr, "[ETCDLIB] Error: %s in js_root not found", ETCD_JSON_NODE);
- }
- if (js_rootnode != NULL) {
- *modifiedIndex = 0;
- retVal = etcd_get_recursive_values(js_rootnode, callback, arg, (json_int_t*)modifiedIndex);
- }
- if (js_root != NULL) {
- json_decref(js_root);
- }
- }
-
- if (reply.memory) {
- free(reply.memory);
- }
-
- return retVal;
+ json_t* js_root = NULL;
+ json_t* js_rootnode = NULL;
+
+ json_error_t error;
+ int res;
+ struct MemoryStruct reply;
+
+ reply.memory = malloc(1); /* will be grown as needed by the realloc above */
+ reply.size = 0; /* no data at this point */
+
+ int retVal = 0;
+ char *url;
+
+ asprintf(&url, "http://%s:%d/v2/keys/%s?recursive=true", etcd_server, etcd_port, directory);
+
+ res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
+ free(url);
+
+ if (res == CURLE_OK) {
+ js_root = json_loads(reply.memory, 0, &error);
+ if (js_root != NULL) {
+ js_rootnode = json_object_get(js_root, ETCD_JSON_NODE);
+ } else {
+ retVal = -1;
+ fprintf(stderr, "[ETCDLIB] Error: %s in js_root not found", ETCD_JSON_NODE);
+ }
+ if (js_rootnode != NULL) {
+ *modifiedIndex = 0;
+ retVal = etcd_get_recursive_values(js_rootnode, callback, arg, (json_int_t*)modifiedIndex);
+ }
+ if (js_root != NULL) {
+ json_decref(js_root);
+ }
+ }
+
+ if (reply.memory) {
+ free(reply.memory);
+ }
+
+ return retVal;
}
/**
* etcd_set
*/
int etcd_set(const char* key, const char* value, int ttl, bool prevExist) {
- json_error_t error;
- json_t* js_root = NULL;
- json_t* js_node = NULL;
- json_t* js_value = NULL;
- int retVal = -1;
- char *url;
- size_t req_len = strlen(value) + MAX_OVERHEAD_LENGTH;
- char request[req_len];
- char* requestPtr = request;
- int res;
- struct MemoryStruct reply;
-
- /* Skip leading '/', etcd cannot handle this. */
- while(*key == '/') {
- key++;
- }
-
- reply.memory = calloc(1, 1); /* will be grown as needed by the realloc above */
- reply.size = 0; /* no data at this point */
-
- asprintf(&url, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key);
-
- requestPtr += snprintf(requestPtr, req_len, "value=%s", value);
- if (ttl > 0) {
- requestPtr += snprintf(requestPtr, req_len-(requestPtr-request), ";ttl=%d", ttl);
- }
-
- if (prevExist) {
- requestPtr += snprintf(requestPtr, req_len-(requestPtr-request), ";prevExist=true");
- }
-
- res = performRequest(url, PUT, WriteMemoryCallback, request, (void*) &reply);
-
- if(url) {
- free(url);
- }
-
- if (res == CURLE_OK) {
- js_root = json_loads(reply.memory, 0, &error);
-
- if (js_root != NULL) {
- js_node = json_object_get(js_root, ETCD_JSON_NODE);
- }
- if (js_node != NULL) {
- js_value = json_object_get(js_node, ETCD_JSON_VALUE);
- }
- if (js_value != NULL && json_is_string(js_value)) {
- if(strcmp(json_string_value(js_value), value) == 0) {
- retVal = 0;
- }
- }
- if (js_root != NULL) {
- json_decref(js_root);
- }
- }
-
- if (reply.memory) {
- free(reply.memory);
- }
-
- return retVal;
+ json_error_t error;
+ json_t* js_root = NULL;
+ json_t* js_node = NULL;
+ json_t* js_value = NULL;
+ int retVal = -1;
+ char *url;
+ size_t req_len = strlen(value) + MAX_OVERHEAD_LENGTH;
+ char request[req_len];
+ char* requestPtr = request;
+ int res;
+ struct MemoryStruct reply;
+
+ /* Skip leading '/', etcd cannot handle this. */
+ while(*key == '/') {
+ key++;
+ }
+
+ reply.memory = calloc(1, 1); /* will be grown as needed by the realloc above */
+ reply.size = 0; /* no data at this point */
+
+ asprintf(&url, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key);
+
+ requestPtr += snprintf(requestPtr, req_len, "value=%s", value);
+ if (ttl > 0) {
+ requestPtr += snprintf(requestPtr, req_len-(requestPtr-request), ";ttl=%d", ttl);
+ }
+
+ if (prevExist) {
+ requestPtr += snprintf(requestPtr, req_len-(requestPtr-request), ";prevExist=true");
+ }
+
+ res = performRequest(url, PUT, WriteMemoryCallback, request, (void*) &reply);
+
+ if(url) {
+ free(url);
+ }
+
+ if (res == CURLE_OK) {
+ js_root = json_loads(reply.memory, 0, &error);
+
+ if (js_root != NULL) {
+ js_node = json_object_get(js_root, ETCD_JSON_NODE);
+ }
+ if (js_node != NULL) {
+ js_value = json_object_get(js_node, ETCD_JSON_VALUE);
+ }
+ if (js_value != NULL && json_is_string(js_value)) {
+ if(strcmp(json_string_value(js_value), value) == 0) {
+ retVal = 0;
+ }
+ }
+ if (js_root != NULL) {
+ json_decref(js_root);
+ }
+ }
+
+ if (reply.memory) {
+ free(reply.memory);
+ }
+
+ return retVal;
}
@@ -295,24 +295,22 @@ int etcd_set(const char* key, const char* value, int ttl, bool prevExist) {
* etcd_set_with_check
*/
int etcd_set_with_check(const char* key, const char* value, int ttl, bool always_write) {
- char *etcd_value;
- int result = 0;
- if (etcd_get(key, &etcd_value, NULL) == 0) {
- if (strcmp(etcd_value, value) != 0) {
- fprintf(stderr, "[ETCDLIB] WARNING: value already exists and is different\n");
- fprintf(stderr, " key = %s\n", key);
- fprintf(stderr, " old value = %s\n", etcd_value);
- fprintf(stderr, " new value = %s\n", value);
- result = -1;
- }
- if (etcd_value) {
- free(etcd_value);
- }
- }
- if(always_write || !result) {
- result = etcd_set(key, value, ttl, false);
- }
- return result;
+ char *etcd_value;
+ int result = 0;
+ if (etcd_get(key, &etcd_value, NULL) == 0) {
+ if (strcmp(etcd_value, value) != 0) {
+ fprintf(stderr, "[ETCDLIB] WARNING: value already exists and is different\n");
+ fprintf(stderr, " key = %s\n", key);
+ fprintf(stderr, " old value = %s\n", etcd_value);
+ fprintf(stderr, " new value = %s\n", value);
+ result = -1;
+ free(etcd_value);
+ }
+ }
+ if(always_write || !result) {
+ result = etcd_set(key, value, ttl, false);
+ }
+ return result;
}
@@ -320,168 +318,168 @@ int etcd_set_with_check(const char* key, const char* value, int ttl, bool always
* etcd_watch
*/
int etcd_watch(const char* key, long long index, char** action, char** prevValue, char** value, char** rkey, long long* modifiedIndex) {
- json_error_t error;
- json_t* js_root = NULL;
- json_t* js_node = NULL;
- json_t* js_prevNode = NULL;
- json_t* js_action = NULL;
- json_t* js_value = NULL;
- json_t* js_rkey = NULL;
- json_t* js_prevValue = NULL;
- json_t* js_modIndex = NULL;
- int retVal = -1;
- char *url = NULL;
- int res;
- struct MemoryStruct reply;
-
- reply.memory = malloc(1); /* will be grown as needed by the realloc above */
- reply.size = 0; /* no data at this point */
-
- if (index != 0)
- asprintf(&url, "http://%s:%d/v2/keys/%s?wait=true&recursive=true&waitIndex=%lld", etcd_server, etcd_port, key, index);
- else
- asprintf(&url, "http://%s:%d/v2/keys/%s?wait=true&recursive=true", etcd_server, etcd_port, key);
- res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
- if(url)
- free(url);
- if (res == CURLE_OK) {
- js_root = json_loads(reply.memory, 0, &error);
-
- if (js_root != NULL) {
- js_action = json_object_get(js_root, ETCD_JSON_ACTION);
- js_node = json_object_get(js_root, ETCD_JSON_NODE);
- js_prevNode = json_object_get(js_root, ETCD_JSON_PREVNODE);
- retVal = 0;
- }
- if (js_prevNode != NULL) {
- js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
- }
- if (js_node != NULL) {
- js_rkey = json_object_get(js_node, ETCD_JSON_KEY);
- js_value = json_object_get(js_node, ETCD_JSON_VALUE);
- js_modIndex = json_object_get(js_node, ETCD_JSON_MODIFIEDINDEX);
- }
- if (js_prevNode != NULL) {
- js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
- }
- if ((prevValue != NULL) && (js_prevValue != NULL) && (json_is_string(js_prevValue))) {
-
- *prevValue = strdup(json_string_value(js_prevValue));
- }
- if(modifiedIndex != NULL) {
- if ((js_modIndex != NULL) && (json_is_integer(js_modIndex))) {
- *modifiedIndex = json_integer_value(js_modIndex);
- } else {
- *modifiedIndex = index;
- }
- }
- if ((rkey != NULL) && (js_rkey != NULL) && (json_is_string(js_rkey))) {
- *rkey = strdup(json_string_value(js_rkey));
-
- }
- if ((action != NULL) && (js_action != NULL) && (json_is_string(js_action))) {
- *action = strdup(json_string_value(js_action));
- }
- if ((value != NULL) && (js_value != NULL) && (json_is_string(js_value))) {
- *value = strdup(json_string_value(js_value));
- }
- if (js_root != NULL) {
- json_decref(js_root);
- }
-
- }
-
- if (reply.memory) {
- free(reply.memory);
- }
-
- return retVal;
+ json_error_t error;
+ json_t* js_root = NULL;
+ json_t* js_node = NULL;
+ json_t* js_prevNode = NULL;
+ json_t* js_action = NULL;
+ json_t* js_value = NULL;
+ json_t* js_rkey = NULL;
+ json_t* js_prevValue = NULL;
+ json_t* js_modIndex = NULL;
+ int retVal = -1;
+ char *url = NULL;
+ int res;
+ struct MemoryStruct reply;
+
+ reply.memory = malloc(1); /* will be grown as needed by the realloc above */
+ reply.size = 0; /* no data at this point */
+
+ if (index != 0)
+ asprintf(&url, "http://%s:%d/v2/keys/%s?wait=true&recursive=true&waitIndex=%lld", etcd_server, etcd_port, key, index);
+ else
+ asprintf(&url, "http://%s:%d/v2/keys/%s?wait=true&recursive=true", etcd_server, etcd_port, key);
+ res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
+ if(url)
+ free(url);
+ if (res == CURLE_OK) {
+ js_root = json_loads(reply.memory, 0, &error);
+
+ if (js_root != NULL) {
+ js_action = json_object_get(js_root, ETCD_JSON_ACTION);
+ js_node = json_object_get(js_root, ETCD_JSON_NODE);
+ js_prevNode = json_object_get(js_root, ETCD_JSON_PREVNODE);
+ retVal = 0;
+ }
+ if (js_prevNode != NULL) {
+ js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
+ }
+ if (js_node != NULL) {
+ js_rkey = json_object_get(js_node, ETCD_JSON_KEY);
+ js_value = json_object_get(js_node, ETCD_JSON_VALUE);
+ js_modIndex = json_object_get(js_node, ETCD_JSON_MODIFIEDINDEX);
+ }
+ if (js_prevNode != NULL) {
+ js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
+ }
+ if ((prevValue != NULL) && (js_prevValue != NULL) && (json_is_string(js_prevValue))) {
+
+ *prevValue = strdup(json_string_value(js_prevValue));
+ }
+ if(modifiedIndex != NULL) {
+ if ((js_modIndex != NULL) && (json_is_integer(js_modIndex))) {
+ *modifiedIndex = json_integer_value(js_modIndex);
+ } else {
+ *modifiedIndex = index;
+ }
+ }
+ if ((rkey != NULL) && (js_rkey != NULL) && (json_is_string(js_rkey))) {
+ *rkey = strdup(json_string_value(js_rkey));
+
+ }
+ if ((action != NULL) && (js_action != NULL) && (json_is_string(js_action))) {
+ *action = strdup(json_string_value(js_action));
+ }
+ if ((value != NULL) && (js_value != NULL) && (json_is_string(js_value))) {
+ *value = strdup(json_string_value(js_value));
+ }
+ if (js_root != NULL) {
+ json_decref(js_root);
+ }
+
+ }
+
+ if (reply.memory) {
+ free(reply.memory);
+ }
+
+ return retVal;
}
/**
* etcd_del
*/
int etcd_del(const char* key) {
- json_error_t error;
- json_t* js_root = NULL;
- json_t* js_node = NULL;
- int retVal = -1;
- char *url;
- int res;
- struct MemoryStruct reply;
-
- reply.memory = malloc(1); /* will be grown as needed by the realloc above */
- reply.size = 0; /* no data at this point */
-
- asprintf(&url, "http://%s:%d/v2/keys/%s?recursive=true", etcd_server, etcd_port, key);
- res = performRequest(url, DELETE, WriteMemoryCallback, NULL, (void*) &reply);
- free(url);
-
- if (res == CURLE_OK) {
- js_root = json_loads(reply.memory, 0, &error);
- if (js_root != NULL) {
- js_node = json_object_get(js_root, ETCD_JSON_NODE);
- }
-
- if (js_node != NULL) {
- retVal = 0;
- }
-
- if (js_root != NULL) {
- json_decref(js_root);
- }
- }
-
- if (reply.memory) {
- free(reply.memory);
- }
-
- return retVal;
+ json_error_t error;
+ json_t* js_root = NULL;
+ json_t* js_node = NULL;
+ int retVal = -1;
+ char *url;
+ int res;
+ struct MemoryStruct reply;
+
+ reply.memory = malloc(1); /* will be grown as needed by the realloc above */
+ reply.size = 0; /* no data at this point */
+
+ asprintf(&url, "http://%s:%d/v2/keys/%s?recursive=true", etcd_server, etcd_port, key);
+ res = performRequest(url, DELETE, WriteMemoryCallback, NULL, (void*) &reply);
+ free(url);
+
+ if (res == CURLE_OK) {
+ js_root = json_loads(reply.memory, 0, &error);
+ if (js_root != NULL) {
+ js_node = json_object_get(js_root, ETCD_JSON_NODE);
+ }
+
+ if (js_node != NULL) {
+ retVal = 0;
+ }
+
+ if (js_root != NULL) {
+ json_decref(js_root);
+ }
+ }
+
+ if (reply.memory) {
+ free(reply.memory);
+ }
+
+ return retVal;
}
static size_t WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp) {
- size_t realsize = size * nmemb;
- struct MemoryStruct *mem = (struct MemoryStruct *) userp;
+ size_t realsize = size * nmemb;
+ struct MemoryStruct *mem = (struct MemoryStruct *) userp;
- mem->memory = realloc(mem->memory, mem->size + realsize + 1);
- if (mem->memory == NULL) {
- /* out of memory! */
- fprintf(stderr, "[ETCDLIB] Error: not enough memory (realloc returned NULL)\n");
- return 0;
- }
+ mem->memory = realloc(mem->memory, mem->size + realsize + 1);
+ if (mem->memory == NULL) {
+ /* out of memory! */
+ fprintf(stderr, "[ETCDLIB] Error: not enough memory (realloc returned NULL)\n");
+ return 0;
+ }
- memcpy(&(mem->memory[mem->size]), contents, realsize);
- mem->size += realsize;
- mem->memory[mem->size] = 0;
+ memcpy(&(mem->memory[mem->size]), contents, realsize);
+ mem->size += realsize;
+ mem->memory[mem->size] = 0;
- return realsize;
+ return realsize;
}
static int performRequest(char* url, request_t request, void* callback, void* reqData, void* repData) {
- CURL *curl = NULL;
- CURLcode res = 0;
- curl = curl_easy_init();
- curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
- curl_easy_setopt(curl, CURLOPT_TIMEOUT, DEFAULT_CURL_TIMEOUT);
- curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, DEFAULT_CURL_CONECTTIMEOUT);
- curl_easy_setopt(curl, CURLOPT_URL, url);
- curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
- curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, callback);
- curl_easy_setopt(curl, CURLOPT_WRITEDATA, repData);
-
- if (request == PUT) {
- curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");
- curl_easy_setopt(curl, CURLOPT_POST, 1L);
- curl_easy_setopt(curl, CURLOPT_POSTFIELDS, reqData);
- } else if (request == DELETE) {
- curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
- } else if (request == GET) {
- curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "GET");
- }
-
- res = curl_easy_perform(curl);
- curl_easy_cleanup(curl);
-
- return res;
+ CURL *curl = NULL;
+ CURLcode res = 0;
+ curl = curl_easy_init();
+ curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
+ curl_easy_setopt(curl, CURLOPT_TIMEOUT, DEFAULT_CURL_TIMEOUT);
+ curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, DEFAULT_CURL_CONECTTIMEOUT);
+ curl_easy_setopt(curl, CURLOPT_URL, url);
+ curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
+ curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, callback);
+ curl_easy_setopt(curl, CURLOPT_WRITEDATA, repData);
+
+ if (request == PUT) {
+ curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");
+ curl_easy_setopt(curl, CURLOPT_POST, 1L);
+ curl_easy_setopt(curl, CURLOPT_POSTFIELDS, reqData);
+ } else if (request == DELETE) {
+ curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
+ } else if (request == GET) {
+ curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "GET");
+ }
+
+ res = curl_easy_perform(curl);
+ curl_easy_cleanup(curl);
+
+ return res;
}
http://git-wip-us.apache.org/repos/asf/celix/blob/3b1491e5/examples/dm_example/phase2a/private/src/phase2a_cmp.c
----------------------------------------------------------------------
diff --git a/examples/dm_example/phase2a/private/src/phase2a_cmp.c b/examples/dm_example/phase2a/private/src/phase2a_cmp.c
index e0b1cc8..d115b69 100644
--- a/examples/dm_example/phase2a/private/src/phase2a_cmp.c
+++ b/examples/dm_example/phase2a/private/src/phase2a_cmp.c
@@ -79,7 +79,6 @@ int phase2a_deinit(phase2a_cmp_t *cmp) {
}
void phase2a_destroy(phase2a_cmp_t *cmp) {
- celixThreadMutex_lock(&cmp->mutex);
celixThreadMutex_destroy(&cmp->mutex);
free(cmp);
printf("destroy phase2a\n");
http://git-wip-us.apache.org/repos/asf/celix/blob/3b1491e5/examples/dm_example/phase2b/private/src/phase2b_cmp.c
----------------------------------------------------------------------
diff --git a/examples/dm_example/phase2b/private/src/phase2b_cmp.c b/examples/dm_example/phase2b/private/src/phase2b_cmp.c
index ccaa94d..a74dcfa 100644
--- a/examples/dm_example/phase2b/private/src/phase2b_cmp.c
+++ b/examples/dm_example/phase2b/private/src/phase2b_cmp.c
@@ -79,7 +79,6 @@ int phase2b_deinit(phase2b_cmp_t *cmp) {
}
void phase2b_destroy(phase2b_cmp_t *cmp) {
- celixThreadMutex_lock(&cmp->mutex);
celixThreadMutex_destroy(&cmp->mutex);
free(cmp);
printf("destroy phase2b\n");
http://git-wip-us.apache.org/repos/asf/celix/blob/3b1491e5/framework/private/src/bundle_archive.c
----------------------------------------------------------------------
diff --git a/framework/private/src/bundle_archive.c b/framework/private/src/bundle_archive.c
index 266d970..3a58b27 100644
--- a/framework/private/src/bundle_archive.c
+++ b/framework/private/src/bundle_archive.c
@@ -502,6 +502,7 @@ static celix_status_t bundleArchive_readLastModified(bundle_archive_pt archive,
char timeStr[20];
int year, month, day, hours, minutes, seconds;
struct tm tm_time;
+ memset(&tm_time,0,sizeof(struct tm));
if (fgets(timeStr, sizeof(timeStr), lastModifiedFile) == NULL) {
status = CELIX_FILE_IO_EXCEPTION;
http://git-wip-us.apache.org/repos/asf/celix/blob/3b1491e5/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c b/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
index e4a8ba8..e6c7a3b 100644
--- a/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
+++ b/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
@@ -45,7 +45,6 @@ struct publisherActivator {
};
celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
- struct publisherActivator * act = malloc(sizeof(*act));
const char* fwUUID = NULL;
@@ -55,6 +54,8 @@ celix_status_t bundleActivator_create(bundle_context_pt context, void **userData
return CELIX_INVALID_BUNDLE_CONTEXT;
}
+ struct publisherActivator * act = malloc(sizeof(*act));
+
bundle_pt bundle = NULL;
long bundleId = 0;
bundleContext_getBundle(context,&bundle);
http://git-wip-us.apache.org/repos/asf/celix/blob/3b1491e5/pubsub/pubsub_admin_udp_mc/private/src/large_udp.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/large_udp.c b/pubsub/pubsub_admin_udp_mc/private/src/large_udp.c
index e5cd5b5..8e838be 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/large_udp.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/large_udp.c
@@ -44,35 +44,35 @@
//#define NO_IP_FRAGMENTATION
struct largeUdp {
- unsigned int maxNrLists;
- array_list_pt udpPartLists;
- pthread_mutex_t dbLock;
+ unsigned int maxNrLists;
+ array_list_pt udpPartLists;
+ pthread_mutex_t dbLock;
};
typedef struct udpPartList {
- unsigned int msg_ident;
- unsigned int msg_size;
- unsigned int nrPartsRemaining;
- char *data;
+ unsigned int msg_ident;
+ unsigned int msg_size;
+ unsigned int nrPartsRemaining;
+ char *data;
} *udpPartList_pt;
typedef struct msg_part_header {
- unsigned int msg_ident;
- unsigned int total_msg_size;
- unsigned int part_msg_size;
- unsigned int offset;
+ unsigned int msg_ident;
+ unsigned int total_msg_size;
+ unsigned int part_msg_size;
+ unsigned int offset;
} msg_part_header_t;
#ifdef NO_IP_FRAGMENTATION
- #define MAX_PART_SIZE (MTU_SIZE - (IP_HEADER_SIZE + UDP_HEADER_SIZE + sizeof(struct msg_part_header) ))
+#define MAX_PART_SIZE (MTU_SIZE - (IP_HEADER_SIZE + UDP_HEADER_SIZE + sizeof(struct msg_part_header) ))
#else
- #define MAX_PART_SIZE (MAX_UDP_MSG_SIZE - (IP_HEADER_SIZE + UDP_HEADER_SIZE + sizeof(struct msg_part_header) ))
+#define MAX_PART_SIZE (MAX_UDP_MSG_SIZE - (IP_HEADER_SIZE + UDP_HEADER_SIZE + sizeof(struct msg_part_header) ))
#endif
typedef struct msg_part {
- msg_part_header_t header;
- char data[MAX_PART_SIZE];
+ msg_part_header_t header;
+ char data[MAX_PART_SIZE];
} msg_part_t;
//
@@ -80,18 +80,18 @@ typedef struct msg_part {
//
largeUdp_pt largeUdp_create(unsigned int maxNrUdpReceptions)
{
- printf("## Creating large UDP\n");
- largeUdp_pt handle = calloc(sizeof(*handle), 1);
- if(handle != NULL) {
- handle->maxNrLists = maxNrUdpReceptions;
- if(arrayList_create(&handle->udpPartLists) != CELIX_SUCCESS) {
- free(handle);
- handle = NULL;
- }
- pthread_mutex_init(&handle->dbLock, 0);
- }
-
- return handle;
+ printf("## Creating large UDP\n");
+ largeUdp_pt handle = calloc(sizeof(*handle), 1);
+ if(handle != NULL) {
+ handle->maxNrLists = maxNrUdpReceptions;
+ if(arrayList_create(&handle->udpPartLists) != CELIX_SUCCESS) {
+ free(handle);
+ handle = NULL;
+ }
+ pthread_mutex_init(&handle->dbLock, 0);
+ }
+
+ return handle;
}
//
@@ -99,27 +99,27 @@ largeUdp_pt largeUdp_create(unsigned int maxNrUdpReceptions)
//
void largeUdp_destroy(largeUdp_pt handle)
{
- printf("### Destroying large UDP\n");
- if(handle != NULL) {
- pthread_mutex_lock(&handle->dbLock);
- int nrUdpLists = arrayList_size(handle->udpPartLists);
- int i;
- for(i=0; i < nrUdpLists; i++) {
- udpPartList_pt udpPartList = arrayList_remove(handle->udpPartLists, i);
- if(udpPartList) {
- if(udpPartList->data) {
- free(udpPartList->data);
- udpPartList->data = NULL;
- }
- free(udpPartList);
- }
- }
- arrayList_destroy(handle->udpPartLists);
- handle->udpPartLists = NULL;
- pthread_mutex_unlock(&handle->dbLock);
- pthread_mutex_destroy(&handle->dbLock);
- free(handle);
- }
+ printf("### Destroying large UDP\n");
+ if(handle != NULL) {
+ pthread_mutex_lock(&handle->dbLock);
+ int nrUdpLists = arrayList_size(handle->udpPartLists);
+ int i;
+ for(i=0; i < nrUdpLists; i++) {
+ udpPartList_pt udpPartList = arrayList_remove(handle->udpPartLists, i);
+ if(udpPartList) {
+ if(udpPartList->data) {
+ free(udpPartList->data);
+ udpPartList->data = NULL;
+ }
+ free(udpPartList);
+ }
+ }
+ arrayList_destroy(handle->udpPartLists);
+ handle->udpPartLists = NULL;
+ pthread_mutex_unlock(&handle->dbLock);
+ pthread_mutex_destroy(&handle->dbLock);
+ free(handle);
+ }
}
//
@@ -127,72 +127,72 @@ void largeUdp_destroy(largeUdp_pt handle)
//
int largeUdp_sendmsg(largeUdp_pt handle, int fd, struct iovec *largeMsg_iovec, int len, int flags, struct sockaddr_in *dest_addr, size_t addrlen)
{
- int n;
- int result = 0;
- msg_part_header_t header;
-
- int written = 0;
- header.msg_ident = rand();
- header.total_msg_size = 0;
- for(n = 0; n < len ;n++) {
- header.total_msg_size += largeMsg_iovec[n].iov_len;
- }
- int nr_buffers = (header.total_msg_size / MAX_PART_SIZE) + 1;
-
- struct iovec msg_iovec[MAX_MSG_VECTOR_LEN];
- struct msghdr msg;
- msg.msg_name = dest_addr;
- msg.msg_namelen = addrlen;
- msg.msg_flags = 0;
- msg.msg_iov = msg_iovec;
- msg.msg_iovlen = 2; // header and payload;
- msg.msg_control = NULL;
- msg.msg_controllen = 0;
-
- msg.msg_iov[0].iov_base = &header;
- msg.msg_iov[0].iov_len = sizeof(header);
-
- for(n = 0; n < nr_buffers; n++) {
-
- header.part_msg_size = (((header.total_msg_size - n * MAX_PART_SIZE) > MAX_PART_SIZE) ? MAX_PART_SIZE : (header.total_msg_size - n * MAX_PART_SIZE));
- header.offset = n * MAX_PART_SIZE;
- int remainingOffset = header.offset;
- int recvPart = 0;
- // find the start of the part
- while(remainingOffset > largeMsg_iovec[recvPart].iov_len) {
- remainingOffset -= largeMsg_iovec[recvPart].iov_len;
- recvPart++;
- }
- int remainingData = header.part_msg_size;
- int sendPart = 1;
- msg.msg_iovlen = 1;
-
- // fill in the output iovec from the input iovec in such a way that all UDP frames are filled maximal.
- while(remainingData > 0) {
- int partLen = ( (largeMsg_iovec[recvPart].iov_len - remainingOffset) <= remainingData ? (largeMsg_iovec[recvPart].iov_len -remainingOffset) : remainingData);
- msg.msg_iov[sendPart].iov_base = largeMsg_iovec[recvPart].iov_base + remainingOffset;
- msg.msg_iov[sendPart].iov_len = partLen;
- remainingData -= partLen;
- remainingOffset = 0;
- sendPart++;
- recvPart++;
- msg.msg_iovlen++;
- }
- int tmp, tmptot;
- for(tmp = 0, tmptot=0; tmp < msg.msg_iovlen; tmp++) {
- tmptot += msg.msg_iov[tmp].iov_len;
- }
-
- int w = sendmsg(fd, &msg, 0);
- if(w == -1) {
- perror("send()");
- result = -1;
- break;
- }
- written += w;
- }
-
- return (result == 0 ? written : result);
+ int n;
+ int result = 0;
+ msg_part_header_t header;
+
+ int written = 0;
+ header.msg_ident = rand();
+ header.total_msg_size = 0;
+ for(n = 0; n < len ;n++) {
+ header.total_msg_size += largeMsg_iovec[n].iov_len;
+ }
+ int nr_buffers = (header.total_msg_size / MAX_PART_SIZE) + 1;
+
+ struct iovec msg_iovec[MAX_MSG_VECTOR_LEN];
+ struct msghdr msg;
+ msg.msg_name = dest_addr;
+ msg.msg_namelen = addrlen;
+ msg.msg_flags = 0;
+ msg.msg_iov = msg_iovec;
+ msg.msg_iovlen = 2; // header and payload;
+ msg.msg_control = NULL;
+ msg.msg_controllen = 0;
+
+ msg.msg_iov[0].iov_base = &header;
+ msg.msg_iov[0].iov_len = sizeof(header);
+
+ for(n = 0; n < nr_buffers; n++) {
+
+ header.part_msg_size = (((header.total_msg_size - n * MAX_PART_SIZE) > MAX_PART_SIZE) ? MAX_PART_SIZE : (header.total_msg_size - n * MAX_PART_SIZE));
+ header.offset = n * MAX_PART_SIZE;
+ int remainingOffset = header.offset;
+ int recvPart = 0;
+ // find the start of the part
+ while(remainingOffset > largeMsg_iovec[recvPart].iov_len) {
+ remainingOffset -= largeMsg_iovec[recvPart].iov_len;
+ recvPart++;
+ }
+ int remainingData = header.part_msg_size;
+ int sendPart = 1;
+ msg.msg_iovlen = 1;
+
+ // fill in the output iovec from the input iovec in such a way that all UDP frames are filled maximal.
+ while(remainingData > 0) {
+ int partLen = ( (largeMsg_iovec[recvPart].iov_len - remainingOffset) <= remainingData ? (largeMsg_iovec[recvPart].iov_len -remainingOffset) : remainingData);
+ msg.msg_iov[sendPart].iov_base = largeMsg_iovec[recvPart].iov_base + remainingOffset;
+ msg.msg_iov[sendPart].iov_len = partLen;
+ remainingData -= partLen;
+ remainingOffset = 0;
+ sendPart++;
+ recvPart++;
+ msg.msg_iovlen++;
+ }
+ int tmp, tmptot;
+ for(tmp = 0, tmptot=0; tmp < msg.msg_iovlen; tmp++) {
+ tmptot += msg.msg_iov[tmp].iov_len;
+ }
+
+ int w = sendmsg(fd, &msg, 0);
+ if(w == -1) {
+ perror("send()");
+ result = -1;
+ break;
+ }
+ written += w;
+ }
+
+ return (result == 0 ? written : result);
}
//
@@ -200,46 +200,46 @@ int largeUdp_sendmsg(largeUdp_pt handle, int fd, struct iovec *largeMsg_iovec, i
//
int largeUdp_sendto(largeUdp_pt handle, int fd, void *buf, size_t count, int flags, struct sockaddr_in *dest_addr, size_t addrlen)
{
- int n;
- int nr_buffers = (count / MAX_PART_SIZE) + 1;
- int result = 0;
- msg_part_header_t header;
-
- int written = 0;
- header.msg_ident = rand();
- header.total_msg_size = count;
- char *databuf = buf;
-
- struct iovec msg_iovec[2];
- struct msghdr msg;
- msg.msg_name = dest_addr;
- msg.msg_namelen = addrlen;
- msg.msg_flags = 0;
- msg.msg_iov = msg_iovec;
- msg.msg_iovlen = 2; // header and payload;
- msg.msg_control = NULL;
- msg.msg_controllen = 0;
-
- msg.msg_iov[0].iov_base = &header;
- msg.msg_iov[0].iov_len = sizeof(header);
-
- for(n = 0; n < nr_buffers; n++) {
-
- header.part_msg_size = (((header.total_msg_size - n * MAX_PART_SIZE) > MAX_PART_SIZE) ? MAX_PART_SIZE : (header.total_msg_size - n * MAX_PART_SIZE));
- header.offset = n * MAX_PART_SIZE;
- msg.msg_iov[1].iov_base = &databuf[header.offset];
- msg.msg_iov[1].iov_len = header.part_msg_size;
- int w = sendmsg(fd, &msg, 0);
- if(w == -1) {
- perror("send()");
- result = -1;
- break;
- }
- written += w;
- //usleep(1000); // TODO: If not slept a UDP buffer overflow occurs and parts are missing at the reception side (at least via localhost)
- }
-
- return (result == 0 ? written : result);
+ int n;
+ int nr_buffers = (count / MAX_PART_SIZE) + 1;
+ int result = 0;
+ msg_part_header_t header;
+
+ int written = 0;
+ header.msg_ident = rand();
+ header.total_msg_size = count;
+ char *databuf = buf;
+
+ struct iovec msg_iovec[2];
+ struct msghdr msg;
+ msg.msg_name = dest_addr;
+ msg.msg_namelen = addrlen;
+ msg.msg_flags = 0;
+ msg.msg_iov = msg_iovec;
+ msg.msg_iovlen = 2; // header and payload;
+ msg.msg_control = NULL;
+ msg.msg_controllen = 0;
+
+ msg.msg_iov[0].iov_base = &header;
+ msg.msg_iov[0].iov_len = sizeof(header);
+
+ for(n = 0; n < nr_buffers; n++) {
+
+ header.part_msg_size = (((header.total_msg_size - n * MAX_PART_SIZE) > MAX_PART_SIZE) ? MAX_PART_SIZE : (header.total_msg_size - n * MAX_PART_SIZE));
+ header.offset = n * MAX_PART_SIZE;
+ msg.msg_iov[1].iov_base = &databuf[header.offset];
+ msg.msg_iov[1].iov_len = header.part_msg_size;
+ int w = sendmsg(fd, &msg, 0);
+ if(w == -1) {
+ perror("send()");
+ result = -1;
+ break;
+ }
+ written += w;
+ //usleep(1000); // TODO: If not slept a UDP buffer overflow occurs and parts are missing at the reception side (at least via localhost)
+ }
+
+ return (result == 0 ? written : result);
}
//
@@ -247,98 +247,106 @@ int largeUdp_sendto(largeUdp_pt handle, int fd, void *buf, size_t count, int fla
// If the message is completely reassembled true is returned and the index and size have valid values
//
bool largeUdp_dataAvailable(largeUdp_pt handle, int fd, unsigned int *index, unsigned int *size) {
- msg_part_header_t header;
- int result = false;
- // Only read the header, we don't know yet where to store the payload
- if(recv(fd, &header, sizeof(header), MSG_PEEK) < 0) {
- perror("read()");
- return false;
- }
-
- struct iovec msg_vec[2];
- struct msghdr msg;
- msg.msg_name = NULL;
- msg.msg_namelen = 0;
- msg.msg_flags = 0;
- msg.msg_iov = msg_vec;
- msg.msg_iovlen = 2; // header and payload;
- msg.msg_control = NULL;
- msg.msg_controllen = 0;
-
- msg.msg_iov[0].iov_base = &header;
- msg.msg_iov[0].iov_len = sizeof(header);
-
- pthread_mutex_lock(&handle->dbLock);
-
- int nrUdpLists = arrayList_size(handle->udpPartLists);
- int i;
- bool found = false;
- for(i = 0; i < nrUdpLists; i++) {
- udpPartList_pt udpPartList = arrayList_get(handle->udpPartLists, i);
- if(udpPartList->msg_ident == header.msg_ident) {
- found = true;
-
- //sanity check
- if(udpPartList->msg_size != header.total_msg_size) {
- // Corruption occurred. Remove the existing administration and build up a new one.
- arrayList_remove(handle->udpPartLists, i);
- free(udpPartList->data);
- free(udpPartList);
- found = false;
- break;
- }
-
- msg.msg_iov[1].iov_base = &udpPartList->data[header.offset];
- msg.msg_iov[1].iov_len = header.part_msg_size;
- recvmsg(fd, &msg, 0);
-
- udpPartList->nrPartsRemaining--;
- if(udpPartList->nrPartsRemaining == 0) {
- *index = i;
- *size = udpPartList->msg_size;
- result = true;
- break;
- } else {
- result = false; // not complete
- break;
- }
- }
- }
-
- if(found == false) {
- udpPartList_pt udpPartList = NULL;
- if(nrUdpLists == handle->maxNrLists) {
- // remove list at index 0
- udpPartList = arrayList_remove(handle->udpPartLists, 0);
- fprintf(stderr, "ERROR: Removing entry for id %d: %d parts not received\n",udpPartList->msg_ident, udpPartList->nrPartsRemaining );
- free(udpPartList->data);
- free(udpPartList);
- nrUdpLists--;
- }
- udpPartList = calloc(sizeof(*udpPartList), 1);
- udpPartList->msg_ident = header.msg_ident;
- udpPartList->msg_size = header.total_msg_size;
- udpPartList->nrPartsRemaining = header.total_msg_size / MAX_PART_SIZE;
- udpPartList->data = calloc(sizeof(char), header.total_msg_size);
-
- msg.msg_iov[1].iov_base = &udpPartList->data[header.offset];
- msg.msg_iov[1].iov_len = header.part_msg_size;
- recvmsg(fd, &msg, 0);
-
- arrayList_add(handle->udpPartLists, udpPartList);
-
- if(udpPartList->nrPartsRemaining == 0) {
- *index = nrUdpLists;
- *size = udpPartList->msg_size;
- result = true;
- } else {
- result = false;
- }
-
- }
- pthread_mutex_unlock(&handle->dbLock);
-
- return result;
+ msg_part_header_t header;
+ int result = false;
+ // Only read the header, we don't know yet where to store the payload
+ if(recv(fd, &header, sizeof(header), MSG_PEEK) < 0) {
+ perror("read()");
+ return false;
+ }
+
+ struct iovec msg_vec[2];
+ struct msghdr msg;
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+ msg.msg_flags = 0;
+ msg.msg_iov = msg_vec;
+ msg.msg_iovlen = 2; // header and payload;
+ msg.msg_control = NULL;
+ msg.msg_controllen = 0;
+
+ msg.msg_iov[0].iov_base = &header;
+ msg.msg_iov[0].iov_len = sizeof(header);
+
+ pthread_mutex_lock(&handle->dbLock);
+
+ int nrUdpLists = arrayList_size(handle->udpPartLists);
+ int i;
+ bool found = false;
+ for(i = 0; i < nrUdpLists; i++) {
+ udpPartList_pt udpPartList = arrayList_get(handle->udpPartLists, i);
+ if(udpPartList->msg_ident == header.msg_ident) {
+ found = true;
+
+ //sanity check
+ if(udpPartList->msg_size != header.total_msg_size) {
+ // Corruption occurred. Remove the existing administration and build up a new one.
+ arrayList_remove(handle->udpPartLists, i);
+ free(udpPartList->data);
+ free(udpPartList);
+ found = false;
+ break;
+ }
+
+ msg.msg_iov[1].iov_base = &udpPartList->data[header.offset];
+ msg.msg_iov[1].iov_len = header.part_msg_size;
+ if(recvmsg(fd, &msg, 0)<0){
+ found=true;
+ result=false;
+ break;
+ }
+
+ udpPartList->nrPartsRemaining--;
+ if(udpPartList->nrPartsRemaining == 0) {
+ *index = i;
+ *size = udpPartList->msg_size;
+ result = true;
+ break;
+ } else {
+ result = false; // not complete
+ break;
+ }
+ }
+ }
+
+ if(found == false) {
+ udpPartList_pt udpPartList = NULL;
+ if(nrUdpLists == handle->maxNrLists) {
+ // remove list at index 0
+ udpPartList = arrayList_remove(handle->udpPartLists, 0);
+ fprintf(stderr, "ERROR: Removing entry for id %d: %d parts not received\n",udpPartList->msg_ident, udpPartList->nrPartsRemaining );
+ free(udpPartList->data);
+ free(udpPartList);
+ nrUdpLists--;
+ }
+ udpPartList = calloc(sizeof(*udpPartList), 1);
+ udpPartList->msg_ident = header.msg_ident;
+ udpPartList->msg_size = header.total_msg_size;
+ udpPartList->nrPartsRemaining = header.total_msg_size / MAX_PART_SIZE;
+ udpPartList->data = calloc(sizeof(char), header.total_msg_size);
+
+ msg.msg_iov[1].iov_base = &udpPartList->data[header.offset];
+ msg.msg_iov[1].iov_len = header.part_msg_size;
+ if(recvmsg(fd, &msg, 0)<0){
+ result=false;
+ }
+ else{
+ arrayList_add(handle->udpPartLists, udpPartList);
+
+ if(udpPartList->nrPartsRemaining == 0) {
+ *index = nrUdpLists;
+ *size = udpPartList->msg_size;
+ result = true;
+ } else {
+ result = false;
+ }
+ }
+
+ }
+
+ pthread_mutex_unlock(&handle->dbLock);
+
+ return result;
}
//
@@ -346,17 +354,17 @@ bool largeUdp_dataAvailable(largeUdp_pt handle, int fd, unsigned int *index, uns
//
int largeUdp_read(largeUdp_pt handle, unsigned int index, void ** buffer, unsigned int size)
{
- int result = 0;
- pthread_mutex_lock(&handle->dbLock);
-
- udpPartList_pt udpPartList = arrayList_remove(handle->udpPartLists, index);
- if(udpPartList) {
- *buffer = udpPartList->data;
- free(udpPartList);
- } else {
- result = -1;
- }
- pthread_mutex_unlock(&handle->dbLock);
-
- return result;
+ int result = 0;
+ pthread_mutex_lock(&handle->dbLock);
+
+ udpPartList_pt udpPartList = arrayList_remove(handle->udpPartLists, index);
+ if(udpPartList) {
+ *buffer = udpPartList->data;
+ free(udpPartList);
+ } else {
+ result = -1;
+ }
+ pthread_mutex_unlock(&handle->dbLock);
+
+ return result;
}
http://git-wip-us.apache.org/repos/asf/celix/blob/3b1491e5/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
index bd3bb2f..cd0658b 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
@@ -98,17 +98,17 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad
const char *mc_ip_prop = NULL;
bundleContext_getProperty(context,PSA_IP , &mc_ip_prop);
if(mc_ip_prop) {
- mc_ip = strdup(mc_ip_prop);
+ mc_ip = strdup(mc_ip_prop);
}
#ifndef ANDROID
if (mc_ip == NULL) {
- const char *mc_prefix = NULL;
- const char *interface = NULL;
- int b0 = 224, b1 = 100, b2 = 1, b3 = 1;
- bundleContext_getProperty(context,PSA_MULTICAST_IP_PREFIX , &mc_prefix);
- if(mc_prefix == NULL) {
- mc_prefix = DEFAULT_MC_PREFIX;
- }
+ const char *mc_prefix = NULL;
+ const char *interface = NULL;
+ int b0 = 224, b1 = 100, b2 = 1, b3 = 1;
+ bundleContext_getProperty(context,PSA_MULTICAST_IP_PREFIX , &mc_prefix);
+ if(mc_prefix == NULL) {
+ mc_prefix = DEFAULT_MC_PREFIX;
+ }
bundleContext_getProperty(context, PSA_ITF, &interface);
if (pubsubAdmin_getIpAddress(interface, &if_ip) != CELIX_SUCCESS) {
@@ -117,20 +117,19 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad
printf("IP Detected : %s\n", if_ip);
if(if_ip && sscanf(if_ip, "%i.%i.%i.%i", &b0, &b1, &b2, &b3) != 4) {
- logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA: Could not parse IP address %s", if_ip);
- b2 = 1;
- b3 = 1;
+ logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA: Could not parse IP address %s", if_ip);
+ b2 = 1;
+ b3 = 1;
}
- asprintf(&mc_ip, "%s.%d.%d",mc_prefix, b2, b3);
-
- int sendSocket = socket(AF_INET, SOCK_DGRAM, 0);
- if(sendSocket == -1) {
- perror("pubsubAdmin_create:socket");
- status = CELIX_SERVICE_EXCEPTION;
- }
+ asprintf(&mc_ip, "%s.%d.%d",mc_prefix, b2, b3);
- if (status == CELIX_SUCCESS){
+ int sendSocket = socket(AF_INET, SOCK_DGRAM, 0);
+ if(sendSocket == -1) {
+ perror("pubsubAdmin_create:socket");
+ status = CELIX_SERVICE_EXCEPTION;
+ }
+ else{
char loop = 1;
if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)) != 0) {
perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_LOOP)");
@@ -144,20 +143,27 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad
perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_IF)");
status = CELIX_SERVICE_EXCEPTION;
}
+ else{
+ (*admin)->sendSocket = sendSocket;
+ }
+ }
- (*admin)->sendSocket = sendSocket;
+ if(status!=CELIX_SUCCESS){
+ close(sendSocket);
}
- }
+ }
+
+
}
#endif
- if (if_ip != NULL) {
- logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA: Using %s as interface for multicast communication", if_ip);
- (*admin)->ifIpAddress = if_ip;
- } else {
- (*admin)->ifIpAddress = strdup("127.0.0.1");
- }
+ if (if_ip != NULL) {
+ logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA: Using %s as interface for multicast communication", if_ip);
+ (*admin)->ifIpAddress = if_ip;
+ } else {
+ (*admin)->ifIpAddress = strdup("127.0.0.1");
+ }
if (mc_ip != NULL) {
logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA: Using %s for service annunciation", mc_ip);
@@ -183,7 +189,7 @@ celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin)
celix_status_t status = CELIX_SUCCESS;
free(admin->mcIpAddress);
- free(admin->ifIpAddress);
+ free(admin->ifIpAddress);
celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
hash_map_iterator_pt iter = hashMapIterator_create(admin->pendingSubscriptions);
@@ -458,21 +464,21 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
celixThreadMutex_unlock(&admin->externalPublicationsLock);
}
- /* Connect the new publisher to the subscription for his topic, if there is any */
- celixThreadMutex_lock(&admin->subscriptionsLock);
+ /* Connect the new publisher to the subscription for his topic, if there is any */
+ celixThreadMutex_lock(&admin->subscriptionsLock);
- topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
- if(sub!=NULL && pubEP->endpoint!=NULL){
- pubsub_topicSubscriptionConnectPublisher(sub,pubEP->endpoint);
- }
+ topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
+ if(sub!=NULL && pubEP->endpoint!=NULL){
+ pubsub_topicSubscriptionConnectPublisher(sub,pubEP->endpoint);
+ }
- /* And check also for ANY subscription */
- topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
- if(any_sub!=NULL && pubEP->endpoint!=NULL){
- pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
- }
+ /* And check also for ANY subscription */
+ topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
+ if(any_sub!=NULL && pubEP->endpoint!=NULL){
+ pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
+ }
- celixThreadMutex_unlock(&admin->subscriptionsLock);
+ celixThreadMutex_unlock(&admin->subscriptionsLock);
/* Re-evaluate the pending subscriptions */
celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
@@ -491,9 +497,9 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
arrayList_destroy(pendingSubList);
free(key);
}
- free(scope_topic);
+ free(scope_topic);
- celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
+ celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
return status;
@@ -542,14 +548,14 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
arrayList_remove(ext_pub_list,i);
}
}
- if(arrayList_size(ext_pub_list)==0){
- hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic);
- char* topic = (char*)hashMapEntry_getKey(entry);
- array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry);
- hashMap_remove(admin->externalPublications,scope_topic);
- arrayList_destroy(list);
- free(topic);
- }
+ if(arrayList_size(ext_pub_list)==0){
+ hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic);
+ char* topic = (char*)hashMapEntry_getKey(entry);
+ array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry);
+ hashMap_remove(admin->externalPublications,scope_topic);
+ arrayList_destroy(list);
+ free(topic);
+ }
}
celixThreadMutex_unlock(&admin->externalPublicationsLock);
@@ -702,7 +708,7 @@ static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt a
arrayList_create(&pendingListPerTopic);
hashMap_put(admin->pendingSubscriptions,scope_topic,pendingListPerTopic);
} else {
- free(scope_topic);
+ free(scope_topic);
}
arrayList_add(pendingListPerTopic,subEP);
http://git-wip-us.apache.org/repos/asf/celix/blob/3b1491e5/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
index 9a9fa55..ddda528 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
@@ -319,6 +319,7 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, voi
int status = 0;
publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle;
+ celixThreadMutex_lock(&(bound->parent->tp_lock));
celixThreadMutex_lock(&(bound->mp_lock));
pubsub_message_type *msgType = hashMap_get(bound->msgTypes, &msgTypeId);
@@ -350,14 +351,12 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, voi
msg->payload = (char *) serializedOutput;
msg->payloadSize = serializedOutputLen;
- celixThreadMutex_lock(&(bound->parent->tp_lock));
if(send_pubsub_msg(bound, msg,true, NULL) == false) {
status = -1;
}
free(msg_hdr);
free(msg);
free(serializedOutput);
- celixThreadMutex_unlock(&(bound->parent->tp_lock));
} else {
printf("TP: Message %u not supported.",msgTypeId);
@@ -365,6 +364,7 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, voi
}
celixThreadMutex_unlock(&(bound->mp_lock));
+ celixThreadMutex_unlock(&(bound->parent->tp_lock));
return status;
}
http://git-wip-us.apache.org/repos/asf/celix/blob/3b1491e5/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
index 1a036db..5f6dcb5 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
@@ -355,7 +355,9 @@ static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt
bound->getCount++;
}
- *service = bound->service;
+ if(bound!=NULL){
+ *service = bound->service;
+ }
celixThreadMutex_unlock(&(publish->tp_lock));
@@ -528,6 +530,11 @@ static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTy
break;
}
+ /* Free msg in case we got into a bad branch */
+ if(status==-4){
+ free(msg);
+ }
+
if(!snd){
printf("TP: Failed to send %s message %u.\n",flags == (PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG) ? "single" : "multipart", msgTypeId);
}
http://git-wip-us.apache.org/repos/asf/celix/blob/3b1491e5/pubsub/pubsub_common/public/src/log_helper.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/src/log_helper.c b/pubsub/pubsub_common/public/src/log_helper.c
index b18ef36..dbd1cc3 100644
--- a/pubsub/pubsub_common/public/src/log_helper.c
+++ b/pubsub/pubsub_common/public/src/log_helper.c
@@ -199,5 +199,7 @@ celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* m
}
}
+ va_end(listPointer);
+
return status;
}
http://git-wip-us.apache.org/repos/asf/celix/blob/3b1491e5/pubsub/pubsub_common/public/src/pubsub_endpoint.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/src/pubsub_endpoint.c b/pubsub/pubsub_common/public/src/pubsub_endpoint.c
index 8586203..ebb330e 100644
--- a/pubsub/pubsub_common/public/src/pubsub_endpoint.c
+++ b/pubsub/pubsub_common/public/src/pubsub_endpoint.c
@@ -58,11 +58,7 @@ celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, cons
psEp->endpoint = strdup(endpoint);
}
- if (status != CELIX_SUCCESS) {
- pubsubEndpoint_destroy(psEp);
- } else {
- *out = psEp;
- }
+ *out = psEp;
return status;
http://git-wip-us.apache.org/repos/asf/celix/blob/3b1491e5/pubsub/pubsub_discovery/private/src/etcd_watcher.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/src/etcd_watcher.c b/pubsub/pubsub_discovery/private/src/etcd_watcher.c
index 0d8468e..13ba3aa 100644
--- a/pubsub/pubsub_discovery/private/src/etcd_watcher.c
+++ b/pubsub/pubsub_discovery/private/src/etcd_watcher.c
@@ -279,8 +279,6 @@ celix_status_t etcdWatcher_stop(etcd_watcher_pt watcher){
watcher->running = false;
celixThreadMutex_unlock(&(watcher->watcherLock));
- watcher->running = false;
-
celixThread_join(watcher->watcherThread, NULL);
return status;
http://git-wip-us.apache.org/repos/asf/celix/blob/3b1491e5/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
index a6541b9..be57933 100644
--- a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
+++ b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
@@ -394,6 +394,7 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r
if(pubsubEndpoint_createFromServiceReference(reference,&subcmp) == CELIX_SUCCESS){
int j,k;
+ celixThreadMutex_lock(&manager->psaListLock);
celixThreadMutex_lock(&manager->subscriptionsLock);
// Inform discoveries that we not interested in the topic any more
@@ -416,7 +417,6 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r
for(j=0;j<arrayList_size(sub_list_by_topic);j++){
pubsub_endpoint_pt sub = arrayList_get(sub_list_by_topic,j);
if(pubsubEndpoint_equals(sub,subcmp)){
- celixThreadMutex_lock(&manager->psaListLock);
double highest_score = -1;
pubsub_admin_service_pt best_psa = NULL;
for(k=0;k<arrayList_size(manager->psaList);k++){
@@ -431,19 +431,16 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r
if (best_psa != NULL){
best_psa->removeSubscription(best_psa->admin,sub);
}
- celixThreadMutex_unlock(&manager->psaListLock);
}
arrayList_remove(sub_list_by_topic,j);
/* If it was the last subscriber for this topic, tell PSA to close the ZMQ socket */
if(arrayList_size(sub_list_by_topic)==0){
- celixThreadMutex_lock(&manager->psaListLock);
for(k=0;k<arrayList_size(manager->psaList);k++){
pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
psa->closeAllSubscriptions(psa->admin,sub->scope, sub->topic);
}
- celixThreadMutex_unlock(&manager->psaListLock);
}
pubsubEndpoint_destroy(sub);
@@ -452,6 +449,7 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r
}
celixThreadMutex_unlock(&manager->subscriptionsLock);
+ celixThreadMutex_unlock(&manager->psaListLock);
pubsubEndpoint_destroy(subcmp);
@@ -753,8 +751,9 @@ celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_end
printf("PSTM: New publisher discovered for topic %s [fwUUID=%s, ep=%s]\n",pubEP->topic,pubEP->frameworkUUID,pubEP->endpoint);
pubsub_topology_manager_pt manager = handle;
- celixThreadMutex_lock(&manager->publicationsLock);
celixThreadMutex_lock(&manager->psaListLock);
+ celixThreadMutex_lock(&manager->publicationsLock);
+
int i;
char *pub_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
@@ -787,8 +786,8 @@ celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_end
status += best_psa->addPublication(best_psa->admin,p);
}
- celixThreadMutex_unlock(&manager->psaListLock);
celixThreadMutex_unlock(&manager->publicationsLock);
+ celixThreadMutex_unlock(&manager->psaListLock);
return status;
}
http://git-wip-us.apache.org/repos/asf/celix/blob/3b1491e5/remote_services/discovery/private/src/endpoint_discovery_poller.c
----------------------------------------------------------------------
diff --git a/remote_services/discovery/private/src/endpoint_discovery_poller.c b/remote_services/discovery/private/src/endpoint_discovery_poller.c
index 0abbded..b92e9f5 100644
--- a/remote_services/discovery/private/src/endpoint_discovery_poller.c
+++ b/remote_services/discovery/private/src/endpoint_discovery_poller.c
@@ -247,9 +247,7 @@ celix_status_t endpointDiscoveryPoller_poll(endpoint_discovery_poller_pt poller,
arrayList_createWithEquals(endpointDiscoveryPoller_endpointDescriptionEquals, &updatedEndpoints);
status = endpointDiscoveryPoller_getEndpoints(poller, url, &updatedEndpoints);
- if (status != CELIX_SUCCESS) {
- celixThreadMutex_unlock(&poller->pollerLock);
- } else {
+ if (status == CELIX_SUCCESS && updatedEndpoints!=NULL) {
if (updatedEndpoints) {
for (unsigned int i = arrayList_size(currentEndpoints); i > 0; i--) {
endpoint_description_pt endpoint = arrayList_get(currentEndpoints, i - 1);
@@ -272,11 +270,9 @@ celix_status_t endpointDiscoveryPoller_poll(endpoint_discovery_poller_pt poller,
}
}
- }
- }
- if (updatedEndpoints) {
- arrayList_destroy(updatedEndpoints);
+ arrayList_destroy(updatedEndpoints);
+ }
}
return status;
http://git-wip-us.apache.org/repos/asf/celix/blob/3b1491e5/remote_services/discovery_etcd/private/src/etcd_watcher.c
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/private/src/etcd_watcher.c b/remote_services/discovery_etcd/private/src/etcd_watcher.c
index a09002a..ebeac4f 100644
--- a/remote_services/discovery_etcd/private/src/etcd_watcher.c
+++ b/remote_services/discovery_etcd/private/src/etcd_watcher.c
@@ -372,7 +372,9 @@ celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher) {
celix_status_t status = CELIX_SUCCESS;
char localNodePath[MAX_LOCALNODE_LENGTH];
+ celixThreadMutex_lock(&watcher->watcherLock);
watcher->running = false;
+ celixThreadMutex_unlock(&watcher->watcherLock);
celixThread_join(watcher->watcherThread, NULL);
http://git-wip-us.apache.org/repos/asf/celix/blob/3b1491e5/shell/private/src/uninstall_command.c
----------------------------------------------------------------------
diff --git a/shell/private/src/uninstall_command.c b/shell/private/src/uninstall_command.c
index 973adf2..fb30831 100644
--- a/shell/private/src/uninstall_command.c
+++ b/shell/private/src/uninstall_command.c
@@ -34,6 +34,7 @@ celix_status_t uninstallCommand_execute(void *handle, char * line, FILE *outStre
char delims[] = " ";
char * sub = NULL;
char *save_ptr = NULL;
+ celix_status_t status = CELIX_SUCCESS;
sub = strtok_r(line, delims, &save_ptr);
sub = strtok_r(NULL, delims, &save_ptr);
@@ -44,8 +45,8 @@ celix_status_t uninstallCommand_execute(void *handle, char * line, FILE *outStre
while (sub != NULL) {
long id = atol(sub);
bundle_pt bundle = NULL;
- bundleContext_getBundleById(context, id, &bundle);
- if (bundle != NULL) {
+ status = bundleContext_getBundleById(context, id, &bundle);
+ if (status==CELIX_SUCCESS && bundle!=NULL) {
bundle_uninstall(bundle);
} else {
fprintf(errStream, "Bundle id is invalid.");
@@ -53,5 +54,5 @@ celix_status_t uninstallCommand_execute(void *handle, char * line, FILE *outStre
sub = strtok_r(NULL, delims, &save_ptr);
}
}
- return CELIX_SUCCESS;
+ return status;
}
http://git-wip-us.apache.org/repos/asf/celix/blob/3b1491e5/shell/private/src/update_command.c
----------------------------------------------------------------------
diff --git a/shell/private/src/update_command.c b/shell/private/src/update_command.c
index 0e45fd7..64999ac 100644
--- a/shell/private/src/update_command.c
+++ b/shell/private/src/update_command.c
@@ -49,8 +49,8 @@ void updateCommand_execute(void *handle, char * line, FILE *outStream, FILE *err
fprintf(errStream, "Incorrect number of arguments.\n");
} else {
long id = atol(sub);
- bundleContext_getBundleById(context, id, &bundle);
- if (bundle != NULL) {
+ celix_status_t ret = bundleContext_getBundleById(context, id, &bundle);
+ if (ret==CELIX_SUCCESS && bundle!=NULL) {
char inputFile[256];
sub = strtok_r(NULL, delims, &save_ptr);
inputFile[0] = '\0';