You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2017/01/27 10:52:09 UTC
celix git commit: CELIX-395: discovery_etcd using etcdlib now
Repository: celix
Updated Branches:
refs/heads/develop 4e665476c -> d62731a3c
CELIX-395: discovery_etcd using etcdlib now
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/d62731a3
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/d62731a3
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/d62731a3
Branch: refs/heads/develop
Commit: d62731a3cfe482d6796fea3fbd386091756f3b2a
Parents: 4e66547
Author: Roy Lenferink <le...@gmail.com>
Authored: Fri Jan 27 10:27:48 2017 +0100
Committer: Roy Lenferink <le...@gmail.com>
Committed: Fri Jan 27 11:35:28 2017 +0100
----------------------------------------------------------------------
remote_services/discovery_etcd/CMakeLists.txt | 4 +-
.../private/include/discovery_impl.h | 4 +-
.../discovery_etcd/private/include/etcd.h | 61 ---
.../discovery_etcd/private/src/etcd.c | 397 -------------------
.../discovery_etcd/private/src/etcd_watcher.c | 168 ++++----
remote_services/examples/CMakeLists.txt | 4 +-
6 files changed, 83 insertions(+), 555 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/d62731a3/remote_services/discovery_etcd/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/CMakeLists.txt b/remote_services/discovery_etcd/CMakeLists.txt
index 31ba269..442d486 100644
--- a/remote_services/discovery_etcd/CMakeLists.txt
+++ b/remote_services/discovery_etcd/CMakeLists.txt
@@ -24,6 +24,7 @@ if (RSA_DISCOVERY_ETCD)
include_directories("${CURL_INCLUDE_DIR}")
include_directories("${JANSSON_INCLUDE_DIR}")
include_directories("${LIBXML2_INCLUDE_DIR}")
+ include_directories("${PROJECT_SOURCE_DIR}/etcdlib/public/include")
include_directories("${PROJECT_SOURCE_DIR}/utils/public/include")
include_directories("${PROJECT_SOURCE_DIR}/remote_services/utils/private/include")
include_directories("${PROJECT_SOURCE_DIR}/remote_services/utils/public/include")
@@ -40,7 +41,6 @@ if (RSA_DISCOVERY_ETCD)
NAME "Apache Celix RSA Discovery ETCD"
SOURCES
private/src/discovery_impl.c
- private/src/etcd.c
private/src/etcd_watcher.c
${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/discovery_activator.c
${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/discovery.c
@@ -56,6 +56,6 @@ if (RSA_DISCOVERY_ETCD)
install_bundle(discovery_etcd)
- target_link_libraries(discovery_etcd celix_framework ${CURL_LIBRARIES} ${LIBXML2_LIBRARIES} ${JANSSON_LIBRARIES})
+ target_link_libraries(discovery_etcd celix_framework etcdlib ${CURL_LIBRARIES} ${LIBXML2_LIBRARIES} ${JANSSON_LIBRARIES})
endif (RSA_DISCOVERY_ETCD)
http://git-wip-us.apache.org/repos/asf/celix/blob/d62731a3/remote_services/discovery_etcd/private/include/discovery_impl.h
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/private/include/discovery_impl.h b/remote_services/discovery_etcd/private/include/discovery_impl.h
index e7e1071..a19b145 100644
--- a/remote_services/discovery_etcd/private/include/discovery_impl.h
+++ b/remote_services/discovery_etcd/private/include/discovery_impl.h
@@ -45,9 +45,7 @@
#define DEFAULT_POLL_ENDPOINTS ""
-#define MAX_ROOTNODE_LENGTH 64
-#define MAX_LOCALNODE_LENGTH 256
-
+#define FREE_MEM(ptr) if(ptr) {free(ptr); ptr = NULL;}
struct discovery {
bundle_context_pt context;
http://git-wip-us.apache.org/repos/asf/celix/blob/d62731a3/remote_services/discovery_etcd/private/include/etcd.h
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/private/include/etcd.h b/remote_services/discovery_etcd/private/include/etcd.h
deleted file mode 100644
index 2ba09de..0000000
--- a/remote_services/discovery_etcd/private/include/etcd.h
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/*
- * etcd.h
- *
- * \date 26 Jul 2014
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-
-#ifndef ETCD_H_
-#define ETCD_H_
-
-#include <stdbool.h>
-#include <celix_errno.h>
-
-#define MAX_NODES 256
-
-#define MAX_KEY_LENGTH 256
-#define MAX_VALUE_LENGTH 256
-#define MAX_ACTION_LENGTH 64
-
-#define MAX_URL_LENGTH 256
-#define MAX_CONTENT_LENGTH 1024
-
-#define ETCD_JSON_NODE "node"
-#define ETCD_JSON_PREVNODE "prevNode"
-#define ETCD_JSON_NODES "nodes"
-#define ETCD_JSON_ACTION "action"
-#define ETCD_JSON_KEY "key"
-#define ETCD_JSON_VALUE "value"
-#define ETCD_JSON_MODIFIEDINDEX "modifiedIndex"
-#define ETCD_ERROR_INDICATION "errorCode"
-#define ETCD_INDEX "index"
-
-celix_status_t etcd_init(char* server, int port);
-bool etcd_get(char* key, char* value, char*action, int* modifiedIndex);
-bool etcd_getNodes(char* directory, char** nodeNames, int* size);
-bool etcd_set(char* key, char* value, int ttl, bool prevExist);
-bool etcd_del(char* key);
-bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value, char* rkey, int *modifiedIndex, int *error);
-
-#endif /* ETCD_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/d62731a3/remote_services/discovery_etcd/private/src/etcd.c
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/private/src/etcd.c b/remote_services/discovery_etcd/private/src/etcd.c
deleted file mode 100644
index 1b74f66..0000000
--- a/remote_services/discovery_etcd/private/src/etcd.c
+++ /dev/null
@@ -1,397 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/*
- * etcd.c
- *
- * \date 26 Jul 2014
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-
-#include <stdio.h>
-#include <stdbool.h>
-#include <string.h>
-
-#include <curl/curl.h>
-#include <jansson.h>
-
-#include "etcd.h"
-
-#define DEFAULT_CURL_TIMEOUT 10
-#define DEFAULT_CURL_CONECTTIMEOUT 10
-
-typedef enum {
- GET, PUT, DELETE
-} request_t;
-
-static char* etcd_server = NULL;
-static int etcd_port = 0;
-
-struct MemoryStruct {
- char *memory;
- size_t size;
-};
-
-static size_t WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp) {
- size_t realsize = size * nmemb;
- struct MemoryStruct *mem = (struct MemoryStruct *) userp;
-
- mem->memory = realloc(mem->memory, mem->size + realsize + 1);
- if (mem->memory == NULL) {
- /* out of memory! */
- printf("not enough memory (realloc returned NULL)\n");
- return 0;
- }
-
- memcpy(&(mem->memory[mem->size]), contents, realsize);
- mem->size += realsize;
- mem->memory[mem->size] = 0;
-
- return realsize;
-}
-
-static int performRequest(char* url, request_t request, void* callback, void* reqData, void* repData) {
- CURL *curl = NULL;
- CURLcode res = 0;
-
- curl = curl_easy_init();
- curl_easy_setopt(curl, CURLOPT_TIMEOUT, DEFAULT_CURL_TIMEOUT);
- curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, DEFAULT_CURL_CONECTTIMEOUT);
- curl_easy_setopt(curl, CURLOPT_URL, url);
- curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
- curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, callback);
- curl_easy_setopt(curl, CURLOPT_WRITEDATA, repData);
- curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
-
- if (request == PUT) {
- curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");
- curl_easy_setopt(curl, CURLOPT_POST, 1L);
-// curl_easy_setopt(curl, CURLOPT_HTTPHEADER, "Content-type: application/json");
- curl_easy_setopt(curl, CURLOPT_POSTFIELDS, reqData);
- } else if (request == DELETE) {
- curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
- } else if (request == GET) {
- curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "GET");
- }
-
- res = curl_easy_perform(curl);
- curl_easy_cleanup(curl);
-
- return res;
-}
-
-// open
-celix_status_t etcd_init(char* server, int port) {
- celix_status_t status = CELIX_SUCCESS;
-
- etcd_server = server;
- etcd_port = port;
-
- return status;
-}
-
-// get
-bool etcd_get(char* key, char* value, char* action, int* modifiedIndex) {
- json_t* js_root = NULL;
- json_t* js_node = NULL;
- json_t* js_value = NULL;
- json_t* js_modifiedIndex = NULL;
- json_error_t error;
- int res;
- struct MemoryStruct reply;
-
- reply.memory = malloc(1); /* will be grown as needed by the realloc above */
- reply.size = 0; /* no data at this point */
-
- bool retVal = false;
- char url[MAX_URL_LENGTH];
- snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key);
-
- res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
-
- if (res == CURLE_OK) {
- js_root = json_loads(reply.memory, 0, &error);
-
- if (js_root != NULL) {
- js_node = json_object_get(js_root, ETCD_JSON_NODE);
- }
- if (js_node != NULL) {
- js_value = json_object_get(js_node, ETCD_JSON_VALUE);
- js_modifiedIndex = json_object_get(js_node, ETCD_JSON_MODIFIEDINDEX);
-
- if (js_modifiedIndex != NULL) {
- *modifiedIndex = json_integer_value(js_modifiedIndex);
- }
-
- if (js_value != NULL) {
- snprintf(value, MAX_VALUE_LENGTH, "%s", json_string_value(js_value));
- retVal = true;
- }
- }
- if (js_root != NULL) {
- json_decref(js_root);
- }
- }
-
- if (reply.memory) {
- free(reply.memory);
- }
-
-
- return retVal;
-}
-
-// getNodes
-bool etcd_getNodes(char* directory, char** nodeNames, int* size) {
- json_t* js_root = NULL;
- json_t* js_node = NULL;
- json_t* js_nodes = NULL;
- json_error_t error;
- int res;
- struct MemoryStruct reply;
-
- reply.memory = malloc(1); /* will be grown as needed by the realloc above */
- reply.size = 0; /* no data at this point */
-
- bool retVal = false;
- char url[MAX_URL_LENGTH];
- snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, directory);
-
- res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
-
- if (res == CURLE_OK) {
- js_root = json_loads(reply.memory, 0, &error);
-
- if (js_root != NULL) {
- js_node = json_object_get(js_root, ETCD_JSON_NODE);
- }
- if (js_node != NULL) {
- js_nodes = json_object_get(js_node, ETCD_JSON_NODES);
- }
-
- if (js_nodes != NULL && json_is_array(js_nodes)) {
- int i = 0;
- retVal = true;
-
- for (i = 0; i < json_array_size(js_nodes) && i < MAX_NODES; i++) {
- json_t* js_node = json_array_get(js_nodes, i);
-
- if (!json_is_object(js_node)) {
- retVal = false;
- } else {
- json_t* js_key = json_object_get(js_node, ETCD_JSON_KEY);
- snprintf(nodeNames[i], MAX_KEY_LENGTH, "%s", json_string_value(js_key));
- }
- }
- *size = i;
- }
- if (js_root != NULL) {
- json_decref(js_root);
- }
- }
-
- if (reply.memory) {
- free(reply.memory);
- }
-
- return retVal;
-}
-
-
-
-bool etcd_set(char* key, char* value, int ttl, bool prevExist) {
- json_error_t error;
- json_t* js_root = NULL;
- json_t* js_node = NULL;
- json_t* js_value = NULL;
- bool retVal = false;
- char url[MAX_URL_LENGTH];
- char request[MAX_CONTENT_LENGTH];
- char* cur = request;
- int res;
- struct MemoryStruct reply;
-
- reply.memory = malloc(1); /* will be grown as needed by the realloc above */
- reply.size = 0; /* no data at this point */
-
- snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key);
- cur += snprintf(cur, MAX_CONTENT_LENGTH, "value=%s", value);
-
- if (ttl > 0)
- cur += snprintf(cur, MAX_CONTENT_LENGTH, ";ttl=%d", ttl);
-
- if (prevExist)
- cur += snprintf(cur, MAX_CONTENT_LENGTH, ";prevExist=true");
-
- res = performRequest(url, PUT, WriteMemoryCallback, request, (void*) &reply);
-
- if (res == CURLE_OK) {
- js_root = json_loads(reply.memory, 0, &error);
-
- if (js_root != NULL) {
- js_node = json_object_get(js_root, ETCD_JSON_NODE);
- }
- if (js_node != NULL) {
- js_value = json_object_get(js_node, ETCD_JSON_VALUE);
- }
- if (js_value != NULL && json_is_string(js_value)) {
- retVal = (strcmp(json_string_value(js_value), value) == 0);
- }
- if (js_root != NULL) {
- json_decref(js_root);
- }
- }
-
- if (reply.memory) {
- free(reply.memory);
- }
-
- return retVal;
-}
-
-
-
-//delete
-bool etcd_del(char* key) {
- json_error_t error;
- json_t* js_root = NULL;
- json_t* js_node = NULL;
- bool retVal = false;
- char url[MAX_URL_LENGTH];
- char request[MAX_CONTENT_LENGTH];
- int res;
- struct MemoryStruct reply;
-
- reply.memory = malloc(1); /* will be grown as needed by the realloc above */
- reply.size = 0; /* no data at this point */
-
- snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key);
- res = performRequest(url, DELETE, WriteMemoryCallback, request, (void*) &reply);
-
- if (res == CURLE_OK) {
- js_root = json_loads(reply.memory, 0, &error);
-
- if (js_root != NULL) {
- js_node = json_object_get(js_root, ETCD_JSON_NODE);
- }
-
- retVal = (js_node != NULL);
-
- if (js_root != NULL) {
- json_decref(js_root);
- }
- }
-
- if (reply.memory) {
- free(reply.memory);
- }
-
-
- return retVal;
-}
-
-///watch
-
-bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value, char* rkey, int* modifiedIndex, int* errorCode) {
- json_error_t error;
- json_t* js_root = NULL;
- json_t* js_node = NULL;
- json_t* js_prevNode = NULL;
- json_t* js_action = NULL;
- json_t* js_value = NULL;
- json_t* js_rkey = NULL;
- json_t* js_prevValue = NULL;
- json_t* js_modIndex = NULL;
- json_t* js_error = NULL; // used to indicate valid json response with ETCD error indication
- bool retVal = false;
- *errorCode = 0;
- char url[MAX_URL_LENGTH];
- int res;
- struct MemoryStruct reply;
-
- reply.memory = malloc(1); /* will be grown as needed by the realloc above */
- reply.size = 0; /* no data at this point */
-
- if (index != 0)
- snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true&recursive=true&waitIndex=%d", etcd_server, etcd_port, key, index);
- else
- snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true&recursive=true", etcd_server, etcd_port, key);
-
- res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
-
- if (res == CURLE_OK) {
-
- js_root = json_loads(reply.memory, 0, &error);
-
- if (js_root != NULL) {
- js_action = json_object_get(js_root, ETCD_JSON_ACTION);
- js_node = json_object_get(js_root, ETCD_JSON_NODE);
- js_prevNode = json_object_get(js_root, ETCD_JSON_PREVNODE);
- js_error = json_object_get(js_root, ETCD_ERROR_INDICATION);
- }
- if (js_prevNode != NULL) {
- js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
- }
- if (js_node != NULL) {
- js_rkey = json_object_get(js_node, ETCD_JSON_KEY);
- js_value = json_object_get(js_node, ETCD_JSON_VALUE);
- js_modIndex = json_object_get(js_node, ETCD_JSON_MODIFIEDINDEX);
- }
- if (js_prevNode != NULL) {
- js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
- }
- if ((js_prevValue != NULL) && (json_is_string(js_prevValue))) {
- strncpy(prevValue, json_string_value(js_prevValue), MAX_VALUE_LENGTH);
- }
- if ((js_value != NULL) && (json_is_string(js_value))) {
- strncpy(value, json_string_value(js_value), MAX_VALUE_LENGTH);
- }
- if ((js_modIndex != NULL) && (json_is_integer(js_modIndex))) {
- *modifiedIndex = json_integer_value(js_modIndex);
- } else {
- *modifiedIndex = index;
- }
-
- if ((js_rkey != NULL) && (js_action != NULL) && (json_is_string(js_rkey)) && (json_is_string(js_action))) {
- strncpy(rkey, json_string_value(js_rkey), MAX_KEY_LENGTH);
- strncpy(action, json_string_value(js_action), MAX_ACTION_LENGTH);
-
- retVal = true;
- }
- if ((js_error != NULL) && (json_is_integer(js_error))) {
- *errorCode = json_integer_value(js_error);
- js_modIndex = json_object_get(js_root, ETCD_INDEX);
- if ((js_modIndex != NULL) && (json_is_integer(js_modIndex))) {
- *modifiedIndex = json_integer_value(js_modIndex);
- }
- }
- if (js_root != NULL) {
- json_decref(js_root);
- }
-
- }
-
- if (reply.memory) {
- free(reply.memory);
- }
-
- return retVal;
-}
http://git-wip-us.apache.org/repos/asf/celix/blob/d62731a3/remote_services/discovery_etcd/private/src/etcd_watcher.c
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/private/src/etcd_watcher.c b/remote_services/discovery_etcd/private/src/etcd_watcher.c
index 7e1ce33..a09002a 100644
--- a/remote_services/discovery_etcd/private/src/etcd_watcher.c
+++ b/remote_services/discovery_etcd/private/src/etcd_watcher.c
@@ -26,6 +26,7 @@
#include <stdbool.h>
#include <stdlib.h>
+#include <unistd.h>
#include <string.h>
#include "log_helper.h"
@@ -35,6 +36,7 @@
#include "discovery.h"
#include "discovery_impl.h"
+#include <curl/curl.h>
#include "etcd.h"
#include "etcd_watcher.h"
@@ -51,18 +53,23 @@ struct etcd_watcher {
volatile bool running;
};
-#define CFG_ETCD_ROOT_PATH "DISCOVERY_ETCD_ROOT_PATH"
-#define DEFAULT_ETCD_ROOTPATH "discovery"
-#define CFG_ETCD_SERVER_IP "DISCOVERY_ETCD_SERVER_IP"
-#define DEFAULT_ETCD_SERVER_IP "127.0.0.1"
+#define MAX_ROOTNODE_LENGTH 128
+#define MAX_LOCALNODE_LENGTH 4096
+#define MAX_VALUE_LENGTH 256
-#define CFG_ETCD_SERVER_PORT "DISCOVERY_ETCD_SERVER_PORT"
-#define DEFAULT_ETCD_SERVER_PORT 2379
+#define CFG_ETCD_ROOT_PATH "DISCOVERY_ETCD_ROOT_PATH"
+#define DEFAULT_ETCD_ROOTPATH "discovery"
+
+#define CFG_ETCD_SERVER_IP "DISCOVERY_ETCD_SERVER_IP"
+#define DEFAULT_ETCD_SERVER_IP "127.0.0.1"
+
+#define CFG_ETCD_SERVER_PORT "DISCOVERY_ETCD_SERVER_PORT"
+#define DEFAULT_ETCD_SERVER_PORT 2379
// be careful - this should be higher than the curl timeout
-#define CFG_ETCD_TTL "DISCOVERY_ETCD_TTL"
-#define DEFAULT_ETCD_TTL 30
+#define CFG_ETCD_TTL "DISCOVERY_ETCD_TTL"
+#define DEFAULT_ETCD_TTL 30
// note that the rootNode shouldn't have a leading slash
@@ -71,36 +78,41 @@ static celix_status_t etcdWatcher_getRootPath(bundle_context_pt context, char* r
const char* rootPath = NULL;
if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS) || (!rootPath)) {
- strcpy(rootNode, DEFAULT_ETCD_ROOTPATH);
+ strncpy(rootNode, DEFAULT_ETCD_ROOTPATH, MAX_ROOTNODE_LENGTH);
}
else {
- strcpy(rootNode, rootPath);
+ strncpy(rootNode, rootPath, MAX_ROOTNODE_LENGTH);
}
return status;
}
-
static celix_status_t etcdWatcher_getLocalNodePath(bundle_context_pt context, char* localNodePath) {
celix_status_t status = CELIX_SUCCESS;
char rootPath[MAX_ROOTNODE_LENGTH];
const char* uuid = NULL;
- if ((etcdWatcher_getRootPath(context, &rootPath[0]) != CELIX_SUCCESS)) {
+ if ((etcdWatcher_getRootPath(context, rootPath) != CELIX_SUCCESS)) {
status = CELIX_ILLEGAL_STATE;
}
else if (((bundleContext_getProperty(context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &uuid)) != CELIX_SUCCESS) || (!uuid)) {
status = CELIX_ILLEGAL_STATE;
}
- else if (rootPath[strlen(&rootPath[0]) - 1] == '/') {
- snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s%s", &rootPath[0], uuid);
+ else if (rootPath[strlen(rootPath) - 1] == '/') {
+ snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s%s", rootPath, uuid);
}
else {
- snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s/%s", &rootPath[0], uuid);
+ snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s/%s", rootPath, uuid);
}
return status;
}
+
+static void add_node(const char *key, const char *value, void* arg) {
+ discovery_pt discovery = (discovery_pt) arg;
+ endpointDiscoveryPoller_addDiscoveryEndpoint(discovery->poller, (char *) value);
+}
+
/*
* retrieves all already existing discovery endpoints
* from etcd and adds them to the poller.
@@ -108,44 +120,18 @@ static celix_status_t etcdWatcher_getLocalNodePath(bundle_context_pt context, ch
* returns the modifiedIndex of the last modified
* discovery endpoint (see etcd documentation).
*/
-static celix_status_t etcdWatcher_addAlreadyExistingWatchpoints(discovery_pt discovery, int* highestModified) {
+static celix_status_t etcdWatcher_addAlreadyExistingWatchpoints(discovery_pt discovery, long long* highestModified) {
celix_status_t status = CELIX_SUCCESS;
- char** nodeArr = calloc(MAX_NODES, sizeof(*nodeArr));
- char rootPath[MAX_ROOTNODE_LENGTH];
- int i, size;
-
- *highestModified = -1;
- for (i = 0; i < MAX_NODES; i++) {
- nodeArr[i] = calloc(MAX_KEY_LENGTH, sizeof(*nodeArr[i]));
- }
+ char rootPath[MAX_ROOTNODE_LENGTH];
+ status = etcdWatcher_getRootPath(discovery->context, rootPath);
- // we need to go though all nodes and get the highest modifiedIndex
- if (((status = etcdWatcher_getRootPath(discovery->context, &rootPath[0])) == CELIX_SUCCESS) &&
- (etcd_getNodes(rootPath, nodeArr, &size) == true)) {
- for (i = 0; i < size; i++) {
- char* key = nodeArr[i];
- char value[MAX_VALUE_LENGTH];
- char action[MAX_VALUE_LENGTH];
- int modIndex;
-
- if (etcd_get(key, &value[0], &action[0], &modIndex) == true) {
- // TODO: check that this is not equals to the local endpoint
- endpointDiscoveryPoller_addDiscoveryEndpoint(discovery->poller, &value[0]);
-
- if (modIndex > *highestModified) {
- *highestModified = modIndex;
- }
- }
+ if (status == CELIX_SUCCESS) {
+ if(etcd_get_directory(rootPath, add_node, discovery, highestModified)) {
+ status = CELIX_ILLEGAL_ARGUMENT;
}
}
- for (i = 0; i < MAX_NODES; i++) {
- free(nodeArr[i]);
- }
-
- free(nodeArr);
-
return status;
}
@@ -154,8 +140,7 @@ static celix_status_t etcdWatcher_addOwnFramework(etcd_watcher_pt watcher)
{
celix_status_t status = CELIX_BUNDLE_EXCEPTION;
char localNodePath[MAX_LOCALNODE_LENGTH];
- char value[MAX_VALUE_LENGTH];
- char action[MAX_VALUE_LENGTH];
+ char *value;
char url[MAX_VALUE_LENGTH];
int modIndex;
char* endpoints = NULL;
@@ -166,30 +151,30 @@ static celix_status_t etcdWatcher_addOwnFramework(etcd_watcher_pt watcher)
endpoint_discovery_server_pt server = watcher->discovery->server;
// register own framework
- if ((status = etcdWatcher_getLocalNodePath(context, &localNodePath[0])) != CELIX_SUCCESS) {
+ if ((status = etcdWatcher_getLocalNodePath(context, localNodePath)) != CELIX_SUCCESS) {
return status;
}
- if (endpointDiscoveryServer_getUrl(server, &url[0]) != CELIX_SUCCESS) {
+ if (endpointDiscoveryServer_getUrl(server, url) != CELIX_SUCCESS) {
snprintf(url, MAX_VALUE_LENGTH, "http://%s:%s/%s", DEFAULT_SERVER_IP, DEFAULT_SERVER_PORT, DEFAULT_SERVER_PATH);
}
- endpoints = &url[0];
+ endpoints = url;
if ((bundleContext_getProperty(context, CFG_ETCD_TTL, &ttlStr) != CELIX_SUCCESS) || !ttlStr) {
ttl = DEFAULT_ETCD_TTL;
}
else
{
- char* endptr = (char*)ttlStr;
+ char* endptr = (char *) ttlStr;
errno = 0;
- ttl = strtol(ttlStr, &endptr, 10);
+ ttl = strtol(ttlStr, &endptr, 10);
if (*endptr || errno != 0) {
ttl = DEFAULT_ETCD_TTL;
}
}
- if (etcd_get(localNodePath, &value[0], &action[0], &modIndex) != true) {
+ if (etcd_get(localNodePath, &value, &modIndex) != true) {
etcd_set(localNodePath, endpoints, ttl, false);
}
else if (etcd_set(localNodePath, endpoints, ttl, true) == false) {
@@ -199,6 +184,8 @@ static celix_status_t etcdWatcher_addOwnFramework(etcd_watcher_pt watcher)
status = CELIX_SUCCESS;
}
+ FREE_MEM(value);
+
return status;
}
@@ -262,52 +249,47 @@ static celix_status_t etcdWatcher_removeEntry(etcd_watcher_pt watcher, char* key
static void* etcdWatcher_run(void* data) {
etcd_watcher_pt watcher = (etcd_watcher_pt) data;
time_t timeBeforeWatch = time(NULL);
- static char rootPath[MAX_ROOTNODE_LENGTH];
- int highestModified = 0;
- int errorCode=0;
+ char rootPath[MAX_ROOTNODE_LENGTH];
+ long long highestModified = 0;
bundle_context_pt context = watcher->discovery->context;
etcdWatcher_addAlreadyExistingWatchpoints(watcher->discovery, &highestModified);
- etcdWatcher_getRootPath(context, &rootPath[0]);
+ etcdWatcher_getRootPath(context, rootPath);
while (watcher->running) {
- char rkey[MAX_KEY_LENGTH];
- char value[MAX_VALUE_LENGTH];
- char preValue[MAX_VALUE_LENGTH];
- char action[MAX_ACTION_LENGTH];
- int modIndex=0;
-
- if (etcd_watch(rootPath, highestModified + 1, &action[0], &preValue[0], &value[0], &rkey[0], &modIndex, &errorCode) == true) {
- if (strcmp(action, "set") == 0) {
- etcdWatcher_addEntry(watcher, &rkey[0], &value[0]);
- } else if (strcmp(action, "delete") == 0) {
- etcdWatcher_removeEntry(watcher, &rkey[0], &value[0]);
- } else if (strcmp(action, "expire") == 0) {
- etcdWatcher_removeEntry(watcher, &rkey[0], &value[0]);
- } else if (strcmp(action, "update") == 0) {
- etcdWatcher_addEntry(watcher, &rkey[0], &value[0]);
- } else {
- logHelper_log(*watcher->loghelper, OSGI_LOGSERVICE_INFO, "Unexpected action: %s", action);
- }
+ char *rkey = NULL;
+ char *value = NULL;
+ char *preValue = NULL;
+ char *action = NULL;
+ long long modIndex;
+
+ if (etcd_watch(rootPath, highestModified + 1, &action, &preValue, &value, &rkey, &modIndex) == 0 && action != NULL) {
+ if (strcmp(action, "set") == 0) {
+ etcdWatcher_addEntry(watcher, rkey, value);
+ } else if (strcmp(action, "delete") == 0) {
+ etcdWatcher_removeEntry(watcher, rkey, value);
+ } else if (strcmp(action, "expire") == 0) {
+ etcdWatcher_removeEntry(watcher, rkey, value);
+ } else if (strcmp(action, "update") == 0) {
+ etcdWatcher_addEntry(watcher, rkey, value);
+ } else {
+ logHelper_log(*watcher->loghelper, OSGI_LOGSERVICE_INFO, "Unexpected action: %s", action);
+ }
- highestModified = modIndex;
- }
- /* prevent busy waiting, in case etcd_watch returns false */
- else {
- switch (errorCode) {
- case 401:
- // Etcd can store at most 1000 events
highestModified = modIndex;
- break;
- default:
- break;
- }
+ } else if (time(NULL) - timeBeforeWatch <= (DEFAULT_ETCD_TTL / 4)) {
+ sleep(DEFAULT_ETCD_TTL / 4);
}
+ FREE_MEM(action);
+ FREE_MEM(value);
+ FREE_MEM(preValue);
+ FREE_MEM(rkey);
+
// update own framework uuid
- if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL/2)) {
+ if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL / 4)) {
etcdWatcher_addOwnFramework(watcher);
timeBeforeWatch = time(NULL);
}
@@ -361,7 +343,11 @@ celix_status_t etcdWatcher_create(discovery_pt discovery, bundle_context_pt cont
}
}
- status = etcd_init((char*)etcd_server, etcd_port);
+ if (etcd_init((char*) etcd_server, etcd_port, CURL_GLOBAL_DEFAULT) != 0) {
+ status = CELIX_BUNDLE_EXCEPTION;
+ } else {
+ status = CELIX_SUCCESS;
+ }
if (status == CELIX_SUCCESS) {
etcdWatcher_addOwnFramework(*watcher);
@@ -391,7 +377,7 @@ celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher) {
celixThread_join(watcher->watcherThread, NULL);
// register own framework
- status = etcdWatcher_getLocalNodePath(watcher->discovery->context, &localNodePath[0]);
+ status = etcdWatcher_getLocalNodePath(watcher->discovery->context, localNodePath);
if (status != CELIX_SUCCESS || etcd_del(localNodePath) == false)
{
http://git-wip-us.apache.org/repos/asf/celix/blob/d62731a3/remote_services/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/remote_services/examples/CMakeLists.txt b/remote_services/examples/CMakeLists.txt
index f24fa6f..44b7733 100644
--- a/remote_services/examples/CMakeLists.txt
+++ b/remote_services/examples/CMakeLists.txt
@@ -83,7 +83,9 @@ if (RSA_EXAMPLES)
BUNDLES discovery_etcd topology_manager remote_service_admin_http calculator shell shell_tui log_service log_writer
)
deploy_bundles_dir(remote-services-etcd DIR_NAME "endpoints"
- BUNDLES org.apache.celix.calc.api.Calculator_endpoint
+ BUNDLES
+ org.apache.celix.calc.api.Calculator_endpoint
+ org.apache.celix.calc.api.Calculator2_endpoint
)
add_deploy("remote-services-etcd-client"