You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2015/09/02 13:44:16 UTC

[02/11] celix git commit: CELIX-252: added separate hashmap to double-check whether expired key belongs to imported services

CELIX-252: added separate hashmap to double-check whether expired key belongs to imported services


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/c396aeed
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/c396aeed
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/c396aeed

Branch: refs/heads/feature/CELIX-237_rsa-ffi
Commit: c396aeed17b7618d5b2109a7a83f0c4213ce70f3
Parents: f32a233
Author: Bjoern Petri <bp...@apache.org>
Authored: Sun Aug 23 21:42:50 2015 +0200
Committer: Bjoern Petri <bp...@apache.org>
Committed: Sun Aug 23 21:42:50 2015 +0200

----------------------------------------------------------------------
 .../discovery_etcd/private/include/etcd.h       |   2 +-
 .../discovery_etcd/private/src/etcd.c           | 135 ++++++++++---------
 .../discovery_etcd/private/src/etcd_watcher.c   |  69 ++++++++--
 3 files changed, 136 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/c396aeed/remote_services/discovery_etcd/private/include/etcd.h
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/private/include/etcd.h b/remote_services/discovery_etcd/private/include/etcd.h
index e36fccb..f5624d0 100644
--- a/remote_services/discovery_etcd/private/include/etcd.h
+++ b/remote_services/discovery_etcd/private/include/etcd.h
@@ -54,6 +54,6 @@ bool etcd_get(char* key, char* value, char*action, int* modifiedIndex);
 bool etcd_getNodes(char* directory, char** nodeNames, int* size);
 bool etcd_set(char* key, char* value, int ttl, bool prevExist);
 bool etcd_del(char* key);
-bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value);
+bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value, char* rkey, int *modifiedIndex);
 
 #endif /* ETCD_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/c396aeed/remote_services/discovery_etcd/private/src/etcd.c
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/private/src/etcd.c b/remote_services/discovery_etcd/private/src/etcd.c
index 2c74856..d38f6bd 100644
--- a/remote_services/discovery_etcd/private/src/etcd.c
+++ b/remote_services/discovery_etcd/private/src/etcd.c
@@ -304,66 +304,79 @@ bool etcd_del(char* key) {
 }
 
 ///watch
-bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value) {
-	json_error_t error;
-	json_t* js_root = NULL;
-	json_t* js_node = NULL;
-	json_t* js_prevNode = NULL;
-	json_t* js_action = NULL;
-	json_t* js_value = NULL;
-	json_t* js_prevValue = NULL;
-	bool retVal = false;
-	char url[MAX_URL_LENGTH];
-	int res;
-	struct MemoryStruct reply;
-
-	reply.memory = malloc(1); /* will be grown as needed by the realloc above */
-	reply.size = 0; /* no data at this point */
 
-	if (index != 0)
-		snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true&recursive=true&waitIndex=%d", etcd_server, etcd_port, key,
-				index);
-	else
-		snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true&recursive=true", etcd_server, etcd_port, key);
-
-	res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
-
-	if (res == CURLE_OK) {
-		js_root = json_loads(reply.memory, 0, &error);
-
-		if (js_root != NULL) {
-			js_action = json_object_get(js_root, ETCD_JSON_ACTION);
-			js_node = json_object_get(js_root, ETCD_JSON_NODE);
-			js_prevNode = json_object_get(js_root, ETCD_JSON_PREVNODE);
-		}
-		if (js_prevNode != NULL) {
-			js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
-		}
-		if (js_node != NULL) {
-			js_value = json_object_get(js_node, ETCD_JSON_VALUE);
-		}
-		if (js_prevNode != NULL) {
-			js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
-		}
-		if ((js_prevValue != NULL) && (json_is_string(js_prevValue))) {
-			strncpy(prevValue, json_string_value(js_prevValue), MAX_VALUE_LENGTH);
-		}
-		if ((js_value != NULL) && (json_is_string(js_value))) {
-			strncpy(value, json_string_value(js_value), MAX_VALUE_LENGTH);
-		}
-		if ((js_action != NULL) && (json_is_string(js_action))) {
-			strncpy(action, json_string_value(js_action), MAX_ACTION_LENGTH);
-
-			retVal = true;
-		}
-		if (js_root != NULL) {
-			json_decref(js_root);
-		}
-	}
-
-	if (reply.memory) {
-		free(reply.memory);
-	}
-
-	return retVal;
+bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value, char* rkey, int* modifiedIndex) {
+    json_error_t error;
+    json_t* js_root = NULL;
+    json_t* js_node = NULL;
+    json_t* js_prevNode = NULL;
+    json_t* js_action = NULL;
+    json_t* js_value = NULL;
+    json_t* js_rkey = NULL;
+    json_t* js_prevValue = NULL;
+    json_t* js_modIndex = NULL;
+    bool retVal = false;
+    char url[MAX_URL_LENGTH];
+    int res;
+    struct MemoryStruct reply;
+
+    reply.memory = malloc(1); /* will be grown as needed by the realloc above */
+    reply.size = 0; /* no data at this point */
+
+    if (index != 0)
+        snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true&recursive=true&waitIndex=%d", etcd_server, etcd_port, key, index);
+    else
+        snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true&recursive=true", etcd_server, etcd_port, key);
+
+    res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
+
+    if (res == CURLE_OK) {
+
+        js_root = json_loads(reply.memory, 0, &error);
+
+        if (js_root != NULL) {
+            js_action = json_object_get(js_root, ETCD_JSON_ACTION);
+            js_node = json_object_get(js_root, ETCD_JSON_NODE);
+            js_prevNode = json_object_get(js_root, ETCD_JSON_PREVNODE);
+        }
+        if (js_prevNode != NULL) {
+            js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
+        }
+        if (js_node != NULL) {
+            js_rkey = json_object_get(js_node, ETCD_JSON_KEY);
+            js_value = json_object_get(js_node, ETCD_JSON_VALUE);
+            js_modIndex = json_object_get(js_node, ETCD_JSON_MODIFIEDINDEX);
+        }
+        if (js_prevNode != NULL) {
+            js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
+        }
+        if ((js_prevValue != NULL) && (json_is_string(js_prevValue))) {
+            strncpy(prevValue, json_string_value(js_prevValue), MAX_VALUE_LENGTH);
+        }
+        if ((js_value != NULL) && (json_is_string(js_value))) {
+            strncpy(value, json_string_value(js_value), MAX_VALUE_LENGTH);
+        }
+        if ((js_modIndex != NULL) && (json_is_integer(js_modIndex))) {
+            *modifiedIndex = json_integer_value(js_modIndex);
+        } else {
+            *modifiedIndex = index;
+        }
+
+        if ((js_rkey != NULL) && (js_action != NULL) && (json_is_string(js_rkey)) && (json_is_string(js_action))) {
+            strncpy(rkey, json_string_value(js_rkey), MAX_KEY_LENGTH);
+            strncpy(action, json_string_value(js_action), MAX_ACTION_LENGTH);
+
+            retVal = true;
+        }
+        if (js_root != NULL) {
+            json_decref(js_root);
+        }
+
+    }
+
+    if (reply.memory) {
+        free(reply.memory);
+    }
+
+    return retVal;
 }

http://git-wip-us.apache.org/repos/asf/celix/blob/c396aeed/remote_services/discovery_etcd/private/src/etcd_watcher.c
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/private/src/etcd_watcher.c b/remote_services/discovery_etcd/private/src/etcd_watcher.c
index eefd28f..89be84e 100644
--- a/remote_services/discovery_etcd/private/src/etcd_watcher.c
+++ b/remote_services/discovery_etcd/private/src/etcd_watcher.c
@@ -31,6 +31,7 @@
 #include "log_helper.h"
 #include "log_service.h"
 #include "constants.h"
+#include "utils.h"
 #include "discovery.h"
 #include "discovery_impl.h"
 
@@ -42,6 +43,7 @@
 struct etcd_watcher {
     discovery_pt discovery;
     log_helper_pt* loghelper;
+    hash_map_pt entries;
 
 	celix_thread_mutex_t watcherLock;
 	celix_thread_t watcherThread;
@@ -200,6 +202,52 @@ static celix_status_t etcdWatcher_addOwnFramework(etcd_watcher_pt watcher)
     return status;
 }
 
+
+
+
+static celix_status_t etcdWatcher_addEntry(etcd_watcher_pt watcher, char* key, char* value) {
+    celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+	endpoint_discovery_poller_pt poller = watcher->discovery->poller;
+
+	if (!hashMap_containsKey(watcher->entries, key)) {
+		status = endpointDiscoveryPoller_addDiscoveryEndpoint(poller, value);
+
+		if (status == CELIX_SUCCESS) {
+			hashMap_put(watcher->entries, key, value);
+		}
+	}
+
+	return status;
+}
+
+
+static celix_status_t etcdWatcher_removeEntry(etcd_watcher_pt watcher, char* key, char* value) {
+    celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+	endpoint_discovery_poller_pt poller = watcher->discovery->poller;
+
+	if (hashMap_containsKey(watcher->entries, key)) {
+
+		hashMap_remove(watcher->entries, key);
+
+		// check if there is another entry with the same value
+		hash_map_iterator_pt iter = hashMapIterator_create(watcher->entries);
+		unsigned int valueFound = 0;
+
+		while (hashMapIterator_hasNext(iter) && valueFound <= 1) {
+			if (strcmp(value, hashMapIterator_nextValue(iter)) == 0)
+				valueFound++;
+		}
+
+		if (valueFound == 0)
+			status = endpointDiscoveryPoller_removeDiscoveryEndpoint(poller, value);
+
+	}
+
+	return status;
+
+}
+
+
 /*
  * performs (blocking) etcd_watch calls to check for
  * changing discovery endpoint information within etcd.
@@ -211,29 +259,32 @@ static void* etcdWatcher_run(void* data) {
 	int highestModified = 0;
 
 	bundle_context_pt context = watcher->discovery->context;
-	endpoint_discovery_poller_pt poller = watcher->discovery->poller;
 
 	etcdWatcher_addAlreadyExistingWatchpoints(watcher->discovery, &highestModified);
 	etcdWatcher_getRootPath(context, &rootPath[0]);
 
 	while (watcher->running) {
+
+        char rkey[MAX_KEY_LENGTH];
 		char value[MAX_VALUE_LENGTH];
 		char preValue[MAX_VALUE_LENGTH];
 		char action[MAX_ACTION_LENGTH];
+        int modIndex;
 
-		if (etcd_watch(rootPath, highestModified+1, &action[0], &preValue[0], &value[0]) == true) {
+		if (etcd_watch(rootPath, highestModified + 1, &action[0], &preValue[0], &value[0], &rkey[0], &modIndex) == true) {
 			if (strcmp(action, "set") == 0) {
-				endpointDiscoveryPoller_addDiscoveryEndpoint(poller, strdup(&value[0]));
+				etcdWatcher_addEntry(watcher, &rkey[0], &value[0]);
 			} else if (strcmp(action, "delete") == 0) {
-				endpointDiscoveryPoller_removeDiscoveryEndpoint(poller, &preValue[0]);
+				etcdWatcher_removeEntry(watcher, &rkey[0], &value[0]);
 			} else if (strcmp(action, "expire") == 0) {
-				endpointDiscoveryPoller_removeDiscoveryEndpoint(poller, &preValue[0]);
+				etcdWatcher_removeEntry(watcher, &rkey[0], &value[0]);
 			} else if (strcmp(action, "update") == 0) {
-				// TODO
+				etcdWatcher_addEntry(watcher, &rkey[0], &value[0]);
 			} else {
 				logHelper_log(*watcher->loghelper, OSGI_LOGSERVICE_INFO, "Unexpected action: %s", action);
 			}
-			highestModified++;
+
+			highestModified = modIndex;
 		}
 
 		// update own framework uuid
@@ -263,7 +314,6 @@ celix_status_t etcdWatcher_create(discovery_pt discovery, bundle_context_pt cont
 		return CELIX_BUNDLE_EXCEPTION;
 	}
 
-
 	(*watcher) = calloc(1, sizeof(struct etcd_watcher));
 	if (!*watcher) {
 		return CELIX_ENOMEM;
@@ -272,6 +322,7 @@ celix_status_t etcdWatcher_create(discovery_pt discovery, bundle_context_pt cont
 	{
 		(*watcher)->discovery = discovery;
 		(*watcher)->loghelper = &discovery->loghelper;
+		(*watcher)->entries = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 	}
 
 	if ((bundleContext_getProperty(context, CFG_ETCD_SERVER_IP, &etcd_server) != CELIX_SUCCESS) || !etcd_server) {
@@ -338,6 +389,8 @@ celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher) {
 
 	watcher->loghelper = NULL;
 
+	hashMap_destroy(watcher->entries, true, true);
+
 	free(watcher);
 
 	return status;