You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by bp...@apache.org on 2015/09/28 12:02:57 UTC

[12/19] celix git commit: CELIX-257: refactored to allow endpoint poll when adding a new url

CELIX-257: refactored to allow endpoint poll when adding a new url


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/fad8ca25
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/fad8ca25
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/fad8ca25

Branch: refs/heads/feature/CELIX-247_android_support
Commit: fad8ca259b62ebaf7baa45103c8f418ab179f7d8
Parents: 1094446
Author: Bjoern Petri <bp...@apache.org>
Authored: Tue Sep 15 09:21:59 2015 +0200
Committer: Bjoern Petri <bp...@apache.org>
Committed: Tue Sep 15 09:21:59 2015 +0200

----------------------------------------------------------------------
 .../private/src/endpoint_discovery_poller.c     | 114 ++++++++++---------
 1 file changed, 63 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/fad8ca25/remote_services/discovery/private/src/endpoint_discovery_poller.c
----------------------------------------------------------------------
diff --git a/remote_services/discovery/private/src/endpoint_discovery_poller.c b/remote_services/discovery/private/src/endpoint_discovery_poller.c
index ae269cf..ac819c5 100644
--- a/remote_services/discovery/private/src/endpoint_discovery_poller.c
+++ b/remote_services/discovery/private/src/endpoint_discovery_poller.c
@@ -43,7 +43,9 @@
 #define DISCOVERY_POLL_INTERVAL "DISCOVERY_CFG_POLL_INTERVAL"
 #define DEFAULT_POLL_INTERVAL "10"
 
-static void *endpointDiscoveryPoller_poll(void *data);
+
+static void *endpointDiscoveryPoller_performPeriodicPoll(void *data);
+celix_status_t endpointDiscoveryPoller_poll(endpoint_discovery_poller_pt poller, char *url, array_list_pt currentEndpoints);
 static celix_status_t endpointDiscoveryPoller_getEndpoints(endpoint_discovery_poller_pt poller, char *url, array_list_pt *updatedEndpoints);
 static celix_status_t endpointDiscoveryPoller_endpointDescriptionEquals(void *endpointPtr, void *comparePtr, bool *equals);
 
@@ -99,7 +101,7 @@ celix_status_t endpointDiscoveryPoller_create(discovery_pt discovery, bundle_con
         return CELIX_BUNDLE_EXCEPTION;
     }
 
-	status = celixThread_create(&(*poller)->pollerThread, NULL, endpointDiscoveryPoller_poll, *poller);
+	status = celixThread_create(&(*poller)->pollerThread, NULL, endpointDiscoveryPoller_performPeriodicPoll, *poller);
 	if (status != CELIX_SUCCESS) {
 		return status;
 	}
@@ -187,6 +189,7 @@ celix_status_t endpointDiscoveryPoller_addDiscoveryEndpoint(endpoint_discovery_p
 		if (status == CELIX_SUCCESS) {
 			logHelper_log(*poller->loghelper, OSGI_LOGSERVICE_DEBUG, "ENDPOINT_POLLER: add new discovery endpoint with url %s", url);
 			hashMap_put(poller->entries, strdup(url), endpoints);
+			endpointDiscoveryPoller_poll(poller, url, endpoints);
 		}
 	}
 
@@ -234,80 +237,89 @@ celix_status_t endpointDiscoveryPoller_removeDiscoveryEndpoint(endpoint_discover
 	return status;
 }
 
-static void *endpointDiscoveryPoller_poll(void *data) {
-    endpoint_discovery_poller_pt poller = (endpoint_discovery_poller_pt) data;
 
-    useconds_t interval = (useconds_t) (poller->poll_interval * 1000000L);
 
-    while (poller->running) {
-    	usleep(interval);
 
-        celix_status_t status = celixThreadMutex_lock(&poller->pollerLock);
-        if (status != CELIX_SUCCESS) {
-        	logHelper_log(*poller->loghelper, OSGI_LOGSERVICE_WARNING, "ENDPOINT_POLLER: failed to obtain lock; retrying...");
-        	continue;
-        }
+celix_status_t endpointDiscoveryPoller_poll(endpoint_discovery_poller_pt poller, char *url, array_list_pt currentEndpoints) {
+	celix_status_t status = NULL;
+	array_list_pt updatedEndpoints = NULL;
 
-		hash_map_iterator_pt iterator = hashMapIterator_create(poller->entries);
+	// create an arraylist with a custom equality test to ensure we can find endpoints properly...
+	arrayList_createWithEquals(endpointDiscoveryPoller_endpointDescriptionEquals, &updatedEndpoints);
+	status = endpointDiscoveryPoller_getEndpoints(poller, url, &updatedEndpoints);
 
-		while (hashMapIterator_hasNext(iterator)) {
-			hash_map_entry_pt entry = hashMapIterator_nextEntry(iterator);
+	if (status != CELIX_SUCCESS) {
+		status = celixThreadMutex_unlock(&poller->pollerLock);
+	} else {
+		if (updatedEndpoints) {
+			for (unsigned int i = arrayList_size(currentEndpoints); i > 0; i--) {
+				endpoint_description_pt endpoint = arrayList_get(currentEndpoints, i - 1);
+
+				if (!arrayList_contains(updatedEndpoints, endpoint)) {
+					status = discovery_removeDiscoveredEndpoint(poller->discovery, endpoint);
+					arrayList_remove(currentEndpoints, i - 1);
+					endpointDescription_destroy(endpoint);
+				}
+			}
 
-			char *url = hashMapEntry_getKey(entry);
-			array_list_pt currentEndpoints = hashMapEntry_getValue(entry);
+			for (int i = arrayList_size(updatedEndpoints); i > 0; i--) {
+				endpoint_description_pt endpoint = arrayList_remove(updatedEndpoints, 0);
 
-			array_list_pt updatedEndpoints = NULL;
-			// create an arraylist with a custom equality test to ensure we can find endpoints properly...
-			arrayList_createWithEquals(endpointDiscoveryPoller_endpointDescriptionEquals, &updatedEndpoints);
-			status = endpointDiscoveryPoller_getEndpoints(poller, url, &updatedEndpoints);
+				if (!arrayList_contains(currentEndpoints, endpoint)) {
+					arrayList_add(currentEndpoints, endpoint);
+					status = discovery_addDiscoveredEndpoint(poller->discovery, endpoint);
+				} else {
+					endpointDescription_destroy(endpoint);
 
-			if (status != CELIX_SUCCESS) {
-				status = celixThreadMutex_unlock(&poller->pollerLock);
-				continue;
+				}
 			}
+		}
 
-			if (updatedEndpoints) {
-				for (unsigned int i = arrayList_size(currentEndpoints); i > 0  ; i--) {
-					endpoint_description_pt endpoint = arrayList_get(currentEndpoints, i-1);
+		if (updatedEndpoints) {
+			arrayList_destroy(updatedEndpoints);
+		}
+	}
 
-					if (!arrayList_contains(updatedEndpoints, endpoint)) {
-						status = discovery_removeDiscoveredEndpoint(poller->discovery, endpoint);
-						arrayList_remove(currentEndpoints, i-1);
-						endpointDescription_destroy(endpoint);
-					}
-				}
+	return status;
+}
 
-				for (int i = arrayList_size(updatedEndpoints); i > 0  ; i--) {
-					endpoint_description_pt endpoint = arrayList_remove(updatedEndpoints, 0);
+static void *endpointDiscoveryPoller_performPeriodicPoll(void *data) {
+	endpoint_discovery_poller_pt poller = (endpoint_discovery_poller_pt) data;
 
-					if (!arrayList_contains(currentEndpoints, endpoint)) {
-						arrayList_add(currentEndpoints, endpoint);
-						status = discovery_addDiscoveredEndpoint(poller->discovery, endpoint);
-					}
-					else {
-						endpointDescription_destroy(endpoint);
+	useconds_t interval = (useconds_t) (poller->poll_interval * 1000000L);
 
-					}
-				}
-			}
+	while (poller->running) {
+		usleep(interval);
+		celix_status_t status = celixThreadMutex_lock(&poller->pollerLock);
+
+		if (status != CELIX_SUCCESS) {
+			logHelper_log(*poller->loghelper, OSGI_LOGSERVICE_WARNING, "ENDPOINT_POLLER: failed to obtain lock; retrying...");
+		} else {
+			hash_map_iterator_pt iterator = hashMapIterator_create(poller->entries);
+
+			while (hashMapIterator_hasNext(iterator)) {
+				hash_map_entry_pt entry = hashMapIterator_nextEntry(iterator);
+
+				char *url = hashMapEntry_getKey(entry);
+				array_list_pt currentEndpoints = hashMapEntry_getValue(entry);
 
-			if (updatedEndpoints) {
-				arrayList_destroy(updatedEndpoints);
+				endpointDiscoveryPoller_poll(poller, url, currentEndpoints);
 			}
 
+			hashMapIterator_destroy(iterator);
 		}
 
-		hashMapIterator_destroy(iterator);
-
 		status = celixThreadMutex_unlock(&poller->pollerLock);
 		if (status != CELIX_SUCCESS) {
 			logHelper_log(*poller->loghelper, OSGI_LOGSERVICE_WARNING, "ENDPOINT_POLLER: failed to release lock; retrying...");
 		}
-    }
+	}
 
-    return NULL;
+	return NULL;
 }
 
+
+
 struct MemoryStruct {
   char *memory;
   size_t size;
@@ -366,7 +378,7 @@ static celix_status_t endpointDiscoveryPoller_getEndpoints(endpoint_discovery_po
 			endpointDescriptorReader_destroy(reader);
     	}
     } else {
-    	logHelper_log(*poller->loghelper, OSGI_LOGSERVICE_ERROR, "ENDPOINT_POLLER: unable to read endpoints, reason: %s", curl_easy_strerror(res));
+    	logHelper_log(*poller->loghelper, OSGI_LOGSERVICE_ERROR, "ENDPOINT_POLLER: unable to read endpoints from %s, reason: %s", url, curl_easy_strerror(res));
     }
 
     // clean up endpoints file