You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2015/09/02 13:44:16 UTC
[02/11] celix git commit: CELIX-252: added separate hashmap to
double-check whether expired key belongs to imported services
CELIX-252: added separate hashmap to double-check whether expired key belongs to imported services
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/c396aeed
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/c396aeed
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/c396aeed
Branch: refs/heads/feature/CELIX-237_rsa-ffi
Commit: c396aeed17b7618d5b2109a7a83f0c4213ce70f3
Parents: f32a233
Author: Bjoern Petri <bp...@apache.org>
Authored: Sun Aug 23 21:42:50 2015 +0200
Committer: Bjoern Petri <bp...@apache.org>
Committed: Sun Aug 23 21:42:50 2015 +0200
----------------------------------------------------------------------
.../discovery_etcd/private/include/etcd.h | 2 +-
.../discovery_etcd/private/src/etcd.c | 135 ++++++++++---------
.../discovery_etcd/private/src/etcd_watcher.c | 69 ++++++++--
3 files changed, 136 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/c396aeed/remote_services/discovery_etcd/private/include/etcd.h
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/private/include/etcd.h b/remote_services/discovery_etcd/private/include/etcd.h
index e36fccb..f5624d0 100644
--- a/remote_services/discovery_etcd/private/include/etcd.h
+++ b/remote_services/discovery_etcd/private/include/etcd.h
@@ -54,6 +54,6 @@ bool etcd_get(char* key, char* value, char*action, int* modifiedIndex);
bool etcd_getNodes(char* directory, char** nodeNames, int* size);
bool etcd_set(char* key, char* value, int ttl, bool prevExist);
bool etcd_del(char* key);
-bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value);
+bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value, char* rkey, int *modifiedIndex);
#endif /* ETCD_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/c396aeed/remote_services/discovery_etcd/private/src/etcd.c
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/private/src/etcd.c b/remote_services/discovery_etcd/private/src/etcd.c
index 2c74856..d38f6bd 100644
--- a/remote_services/discovery_etcd/private/src/etcd.c
+++ b/remote_services/discovery_etcd/private/src/etcd.c
@@ -304,66 +304,79 @@ bool etcd_del(char* key) {
}
///watch
-bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value) {
- json_error_t error;
- json_t* js_root = NULL;
- json_t* js_node = NULL;
- json_t* js_prevNode = NULL;
- json_t* js_action = NULL;
- json_t* js_value = NULL;
- json_t* js_prevValue = NULL;
- bool retVal = false;
- char url[MAX_URL_LENGTH];
- int res;
- struct MemoryStruct reply;
-
- reply.memory = malloc(1); /* will be grown as needed by the realloc above */
- reply.size = 0; /* no data at this point */
- if (index != 0)
- snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true&recursive=true&waitIndex=%d", etcd_server, etcd_port, key,
- index);
- else
- snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true&recursive=true", etcd_server, etcd_port, key);
-
- res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
-
- if (res == CURLE_OK) {
- js_root = json_loads(reply.memory, 0, &error);
-
- if (js_root != NULL) {
- js_action = json_object_get(js_root, ETCD_JSON_ACTION);
- js_node = json_object_get(js_root, ETCD_JSON_NODE);
- js_prevNode = json_object_get(js_root, ETCD_JSON_PREVNODE);
- }
- if (js_prevNode != NULL) {
- js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
- }
- if (js_node != NULL) {
- js_value = json_object_get(js_node, ETCD_JSON_VALUE);
- }
- if (js_prevNode != NULL) {
- js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
- }
- if ((js_prevValue != NULL) && (json_is_string(js_prevValue))) {
- strncpy(prevValue, json_string_value(js_prevValue), MAX_VALUE_LENGTH);
- }
- if ((js_value != NULL) && (json_is_string(js_value))) {
- strncpy(value, json_string_value(js_value), MAX_VALUE_LENGTH);
- }
- if ((js_action != NULL) && (json_is_string(js_action))) {
- strncpy(action, json_string_value(js_action), MAX_ACTION_LENGTH);
-
- retVal = true;
- }
- if (js_root != NULL) {
- json_decref(js_root);
- }
- }
-
- if (reply.memory) {
- free(reply.memory);
- }
-
- return retVal;
+bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value, char* rkey, int* modifiedIndex) {
+ json_error_t error;
+ json_t* js_root = NULL;
+ json_t* js_node = NULL;
+ json_t* js_prevNode = NULL;
+ json_t* js_action = NULL;
+ json_t* js_value = NULL;
+ json_t* js_rkey = NULL;
+ json_t* js_prevValue = NULL;
+ json_t* js_modIndex = NULL;
+ bool retVal = false;
+ char url[MAX_URL_LENGTH];
+ int res;
+ struct MemoryStruct reply;
+
+ reply.memory = malloc(1); /* will be grown as needed by the realloc above */
+ reply.size = 0; /* no data at this point */
+
+ if (index != 0)
+ snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true&recursive=true&waitIndex=%d", etcd_server, etcd_port, key, index);
+ else
+ snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true&recursive=true", etcd_server, etcd_port, key);
+
+ res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
+
+ if (res == CURLE_OK) {
+
+ js_root = json_loads(reply.memory, 0, &error);
+
+ if (js_root != NULL) {
+ js_action = json_object_get(js_root, ETCD_JSON_ACTION);
+ js_node = json_object_get(js_root, ETCD_JSON_NODE);
+ js_prevNode = json_object_get(js_root, ETCD_JSON_PREVNODE);
+ }
+ if (js_prevNode != NULL) {
+ js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
+ }
+ if (js_node != NULL) {
+ js_rkey = json_object_get(js_node, ETCD_JSON_KEY);
+ js_value = json_object_get(js_node, ETCD_JSON_VALUE);
+ js_modIndex = json_object_get(js_node, ETCD_JSON_MODIFIEDINDEX);
+ }
+ if (js_prevNode != NULL) {
+ js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
+ }
+ if ((js_prevValue != NULL) && (json_is_string(js_prevValue))) {
+ strncpy(prevValue, json_string_value(js_prevValue), MAX_VALUE_LENGTH);
+ }
+ if ((js_value != NULL) && (json_is_string(js_value))) {
+ strncpy(value, json_string_value(js_value), MAX_VALUE_LENGTH);
+ }
+ if ((js_modIndex != NULL) && (json_is_integer(js_modIndex))) {
+ *modifiedIndex = json_integer_value(js_modIndex);
+ } else {
+ *modifiedIndex = index;
+ }
+
+ if ((js_rkey != NULL) && (js_action != NULL) && (json_is_string(js_rkey)) && (json_is_string(js_action))) {
+ strncpy(rkey, json_string_value(js_rkey), MAX_KEY_LENGTH);
+ strncpy(action, json_string_value(js_action), MAX_ACTION_LENGTH);
+
+ retVal = true;
+ }
+ if (js_root != NULL) {
+ json_decref(js_root);
+ }
+
+ }
+
+ if (reply.memory) {
+ free(reply.memory);
+ }
+
+ return retVal;
}
http://git-wip-us.apache.org/repos/asf/celix/blob/c396aeed/remote_services/discovery_etcd/private/src/etcd_watcher.c
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/private/src/etcd_watcher.c b/remote_services/discovery_etcd/private/src/etcd_watcher.c
index eefd28f..89be84e 100644
--- a/remote_services/discovery_etcd/private/src/etcd_watcher.c
+++ b/remote_services/discovery_etcd/private/src/etcd_watcher.c
@@ -31,6 +31,7 @@
#include "log_helper.h"
#include "log_service.h"
#include "constants.h"
+#include "utils.h"
#include "discovery.h"
#include "discovery_impl.h"
@@ -42,6 +43,7 @@
struct etcd_watcher {
discovery_pt discovery;
log_helper_pt* loghelper;
+ hash_map_pt entries;
celix_thread_mutex_t watcherLock;
celix_thread_t watcherThread;
@@ -200,6 +202,52 @@ static celix_status_t etcdWatcher_addOwnFramework(etcd_watcher_pt watcher)
return status;
}
+
+
+
+static celix_status_t etcdWatcher_addEntry(etcd_watcher_pt watcher, char* key, char* value) {
+ celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+ endpoint_discovery_poller_pt poller = watcher->discovery->poller;
+
+ if (!hashMap_containsKey(watcher->entries, key)) {
+ status = endpointDiscoveryPoller_addDiscoveryEndpoint(poller, value);
+
+ if (status == CELIX_SUCCESS) {
+ hashMap_put(watcher->entries, key, value);
+ }
+ }
+
+ return status;
+}
+
+
+static celix_status_t etcdWatcher_removeEntry(etcd_watcher_pt watcher, char* key, char* value) {
+ celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+ endpoint_discovery_poller_pt poller = watcher->discovery->poller;
+
+ if (hashMap_containsKey(watcher->entries, key)) {
+
+ hashMap_remove(watcher->entries, key);
+
+ // check if there is another entry with the same value
+ hash_map_iterator_pt iter = hashMapIterator_create(watcher->entries);
+ unsigned int valueFound = 0;
+
+ while (hashMapIterator_hasNext(iter) && valueFound <= 1) {
+ if (strcmp(value, hashMapIterator_nextValue(iter)) == 0)
+ valueFound++;
+ }
+
+ if (valueFound == 0)
+ status = endpointDiscoveryPoller_removeDiscoveryEndpoint(poller, value);
+
+ }
+
+ return status;
+
+}
+
+
/*
* performs (blocking) etcd_watch calls to check for
* changing discovery endpoint information within etcd.
@@ -211,29 +259,32 @@ static void* etcdWatcher_run(void* data) {
int highestModified = 0;
bundle_context_pt context = watcher->discovery->context;
- endpoint_discovery_poller_pt poller = watcher->discovery->poller;
etcdWatcher_addAlreadyExistingWatchpoints(watcher->discovery, &highestModified);
etcdWatcher_getRootPath(context, &rootPath[0]);
while (watcher->running) {
+
+ char rkey[MAX_KEY_LENGTH];
char value[MAX_VALUE_LENGTH];
char preValue[MAX_VALUE_LENGTH];
char action[MAX_ACTION_LENGTH];
+ int modIndex;
- if (etcd_watch(rootPath, highestModified+1, &action[0], &preValue[0], &value[0]) == true) {
+ if (etcd_watch(rootPath, highestModified + 1, &action[0], &preValue[0], &value[0], &rkey[0], &modIndex) == true) {
if (strcmp(action, "set") == 0) {
- endpointDiscoveryPoller_addDiscoveryEndpoint(poller, strdup(&value[0]));
+ etcdWatcher_addEntry(watcher, &rkey[0], &value[0]);
} else if (strcmp(action, "delete") == 0) {
- endpointDiscoveryPoller_removeDiscoveryEndpoint(poller, &preValue[0]);
+ etcdWatcher_removeEntry(watcher, &rkey[0], &value[0]);
} else if (strcmp(action, "expire") == 0) {
- endpointDiscoveryPoller_removeDiscoveryEndpoint(poller, &preValue[0]);
+ etcdWatcher_removeEntry(watcher, &rkey[0], &value[0]);
} else if (strcmp(action, "update") == 0) {
- // TODO
+ etcdWatcher_addEntry(watcher, &rkey[0], &value[0]);
} else {
logHelper_log(*watcher->loghelper, OSGI_LOGSERVICE_INFO, "Unexpected action: %s", action);
}
- highestModified++;
+
+ highestModified = modIndex;
}
// update own framework uuid
@@ -263,7 +314,6 @@ celix_status_t etcdWatcher_create(discovery_pt discovery, bundle_context_pt cont
return CELIX_BUNDLE_EXCEPTION;
}
-
(*watcher) = calloc(1, sizeof(struct etcd_watcher));
if (!*watcher) {
return CELIX_ENOMEM;
@@ -272,6 +322,7 @@ celix_status_t etcdWatcher_create(discovery_pt discovery, bundle_context_pt cont
{
(*watcher)->discovery = discovery;
(*watcher)->loghelper = &discovery->loghelper;
+ (*watcher)->entries = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
}
if ((bundleContext_getProperty(context, CFG_ETCD_SERVER_IP, &etcd_server) != CELIX_SUCCESS) || !etcd_server) {
@@ -338,6 +389,8 @@ celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher) {
watcher->loghelper = NULL;
+ hashMap_destroy(watcher->entries, true, true);
+
free(watcher);
return status;