You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2017/02/06 18:34:19 UTC
[07/19] celix git commit: CELIX-399: Pubsub discovery using etcdlib
now
CELIX-399: Pubsub discovery using etcdlib now
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/1c42f1d2
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/1c42f1d2
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/1c42f1d2
Branch: refs/heads/develop
Commit: 1c42f1d285d5e9c1f095fe79b0f109ccc0b0dda9
Parents: 8c84284
Author: Roy Lenferink <le...@gmail.com>
Authored: Mon Feb 6 17:44:55 2017 +0100
Committer: Roy Lenferink <le...@gmail.com>
Committed: Mon Feb 6 17:44:55 2017 +0100
----------------------------------------------------------------------
.../pubsub/pubsub_common/public/include/etcd.h | 39 --
.../pubsub/pubsub_common/public/src/etcd.c | 474 -------------------
.../pubsub/pubsub_discovery/CMakeLists.txt | 4 +-
.../pubsub_discovery/private/src/etcd_common.c | 4 +-
4 files changed, 4 insertions(+), 517 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/1c42f1d2/celix-pubsub/pubsub/pubsub_common/public/include/etcd.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/include/etcd.h b/celix-pubsub/pubsub/pubsub_common/public/include/etcd.h
deleted file mode 100644
index a502df4..0000000
--- a/celix-pubsub/pubsub/pubsub_common/public/include/etcd.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#ifndef ETCD_H_
-#define ETCD_H_
-
-#include <stdbool.h>
-
-typedef void (*etcd_key_value_callback) (const char *key, const char *value, void* arg);
-
-int etcd_init(const char* server, int port);
-
-int etcd_get(const char* key, char** value, int* modifiedIndex);
-int etcd_get_directory(const char* directory, etcd_key_value_callback callback, void *arg, long long* modifiedIndex);
-
-int etcd_set(const char* key, const char* value, int ttl, bool prevExist);
-int etcd_set_with_check(const char* key, const char* value, int ttl, bool always_write);
-
-int etcd_del(const char* key);
-
-int etcd_watch(const char* key, long long index, char** action, char** prevValue, char** value, char** rkey, long long* modifiedIndex);
-
-#endif /* ETCD_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/1c42f1d2/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c b/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c
deleted file mode 100644
index 99ec87a..0000000
--- a/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c
+++ /dev/null
@@ -1,474 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#include <stdio.h>
-#include <stdbool.h>
-#include <string.h>
-#include <curl/curl.h>
-#include <jansson.h>
-#include "etcd.h"
-
-#define ETCD_JSON_NODE "node"
-#define ETCD_JSON_PREVNODE "prevNode"
-#define ETCD_JSON_NODES "nodes"
-#define ETCD_JSON_ACTION "action"
-#define ETCD_JSON_KEY "key"
-#define ETCD_JSON_VALUE "value"
-#define ETCD_JSON_DIR "dir"
-#define ETCD_JSON_MODIFIEDINDEX "modifiedIndex"
-
-#define MAX_OVERHEAD_LENGTH 64
-#define DEFAULT_CURL_TIMEOUT 10
-#define DEFAULT_CURL_CONECTTIMEOUT 10
-
-typedef enum {
- GET, PUT, DELETE
-} request_t;
-
-static const char* etcd_server;
-static int etcd_port = 0;
-
-struct MemoryStruct {
- char *memory;
- size_t size;
-};
-
-
-/**
- * Static function declarations
- */
-static int performRequest(char* url, request_t request, void* callback, void* reqData, void* repData);
-static size_t WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp);
-/**
- * External function definition
- */
-
-
-/**
- * etcd_init
- */
-int etcd_init(const char* server, int port) {
- etcd_server = server;
- etcd_port = port;
-
- return curl_global_init(CURL_GLOBAL_ALL) != 0;
-}
-
-
-/**
- * etcd_get
- */
-int etcd_get(const char* key, char** value, int* modifiedIndex) {
- json_t* js_root = NULL;
- json_t* js_node = NULL;
- json_t* js_value = NULL;
- json_t* js_modifiedIndex = NULL;
- json_error_t error;
- int res = -1;
- struct MemoryStruct reply;
-
- reply.memory = malloc(1); /* will be grown as needed by the realloc above */
- reply.size = 0; /* no data at this point */
-
- int retVal = -1;
- char *url;
- asprintf(&url, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key);
- res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
- free(url);
- if (res == CURLE_OK) {
- js_root = json_loads(reply.memory, 0, &error);
-
- if (js_root != NULL) {
- js_node = json_object_get(js_root, ETCD_JSON_NODE);
- }
- if (js_node != NULL) {
- js_value = json_object_get(js_node, ETCD_JSON_VALUE);
- js_modifiedIndex = json_object_get(js_node,
- ETCD_JSON_MODIFIEDINDEX);
-
- if (js_modifiedIndex != NULL && js_value != NULL) {
- if (modifiedIndex) {
- *modifiedIndex = json_integer_value(js_modifiedIndex);
- }
- *value = strdup(json_string_value(js_value));
- retVal = 0;
- }
- }
- if (js_root != NULL) {
- json_decref(js_root);
- }
- }
-
- if (reply.memory) {
- free(reply.memory);
- }
- if(retVal != 0) {
- value = NULL;
- }
- return retVal;
-}
-
-
-static int etcd_get_recursive_values(json_t* js_root, etcd_key_value_callback callback, void *arg, json_int_t *mod_index) {
- json_t *js_nodes;
- if ((js_nodes = json_object_get(js_root, ETCD_JSON_NODES)) != NULL) {
- // subarray
- if (json_is_array(js_nodes)) {
- int len = json_array_size(js_nodes);
- for (int i = 0; i < len; i++) {
- json_t *js_object = json_array_get(js_nodes, i);
- json_t *js_mod_index = json_object_get(js_object, ETCD_JSON_MODIFIEDINDEX);
-
- if(js_mod_index != NULL) {
- json_int_t index = json_integer_value(js_mod_index);
- if(*mod_index < index) {
- *mod_index = index;
- }
- } else {
- printf("[ETCDLIB] Error: No INDEX found for key!\n");
- }
-
- if (json_object_get(js_object, ETCD_JSON_NODES)) {
- // node contains nodes
- etcd_get_recursive_values(js_object, callback, arg, mod_index);
- } else {
- json_t* js_key = json_object_get(js_object, ETCD_JSON_KEY);
- json_t* js_value = json_object_get(js_object, ETCD_JSON_VALUE);
-
- if (js_key && js_value) {
- if (!json_object_get(js_object, ETCD_JSON_DIR)) {
- callback(json_string_value(js_key), json_string_value(js_value), arg);
- }
- } //else empty etcd directory, not an error.
-
- }
- }
- } else {
- printf("[ETCDLIB] Error: misformatted JSON: nodes element is not an array !!\n");
- }
- } else {
- printf("[ETCDLIB] Error: nodes element not found!!\n");
- }
-
- return (*index > 0 ? 0 : 1);
-}
-
-/**
- * etcd_get_directory
- */
-int etcd_get_directory(const char* directory, etcd_key_value_callback callback, void* arg, long long* modifiedIndex) {
- json_t* js_root = NULL;
- json_t* js_rootnode = NULL;
-
- json_error_t error;
- int res;
- struct MemoryStruct reply;
-
- reply.memory = malloc(1); /* will be grown as needed by the realloc above */
- reply.size = 0; /* no data at this point */
-
- int retVal = 0;
- char *url;
-
- asprintf(&url, "http://%s:%d/v2/keys/%s?recursive=true", etcd_server, etcd_port, directory);
-
- res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
- free(url);
-
- if (res == CURLE_OK) {
- js_root = json_loads(reply.memory, 0, &error);
- if (js_root != NULL) {
- js_rootnode = json_object_get(js_root, ETCD_JSON_NODE);
- } else {
- retVal = -1;
- printf("ERROR ETCD: %s in js_root not found", ETCD_JSON_NODE);
- }
- if (js_rootnode != NULL) {
- *modifiedIndex = 0;
- retVal = etcd_get_recursive_values(js_rootnode, callback, arg, (json_int_t*)modifiedIndex);
- }
- if (js_root != NULL) {
- json_decref(js_root);
- }
- }
-
- if (reply.memory) {
- free(reply.memory);
- }
-
- return retVal;
-}
-
-/**
- * etcd_set
- */
-int etcd_set(const char* key, const char* value, int ttl, bool prevExist) {
- json_error_t error;
- json_t* js_root = NULL;
- json_t* js_node = NULL;
- json_t* js_value = NULL;
- int retVal = -1;
- char *url;
- size_t req_len = strlen(value) + MAX_OVERHEAD_LENGTH;
- char request[req_len];
- char* requestPtr = request;
- int res;
- struct MemoryStruct reply;
-
- /* Skip leading '/', etcd cannot handle this. */
- while(*key == '/') {
- key++;
- }
-
- reply.memory = calloc(1, 1); /* will be grown as needed by the realloc above */
- reply.size = 0; /* no data at this point */
-
- asprintf(&url, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key);
-
- requestPtr += snprintf(requestPtr, req_len, "value=%s", value);
- if (ttl > 0) {
- requestPtr += snprintf(requestPtr, req_len-(requestPtr-request), ";ttl=%d", ttl);
- }
-
- if (prevExist) {
- requestPtr += snprintf(requestPtr, req_len-(requestPtr-request), ";prevExist=true");
- }
- res = performRequest(url, PUT, WriteMemoryCallback, request, (void*) &reply);
- if(url) {
- free(url);
- }
-
- if (res == CURLE_OK) {
- js_root = json_loads(reply.memory, 0, &error);
-
- if (js_root != NULL) {
- js_node = json_object_get(js_root, ETCD_JSON_NODE);
- }
- if (js_node != NULL) {
- js_value = json_object_get(js_node, ETCD_JSON_VALUE);
- }
- if (js_value != NULL && json_is_string(js_value)) {
- if(strcmp(json_string_value(js_value), value) == 0) {
- retVal = 0;
- }
- }
- if (js_root != NULL) {
- json_decref(js_root);
- }
- }
-
- if (reply.memory) {
- free(reply.memory);
- }
-
- return retVal;
-}
-
-
-/**
- * etcd_set_with_check
- */
-int etcd_set_with_check(const char* key, const char* value, int ttl, bool always_write) {
- char *etcd_value;
- int result = 0;
- if (etcd_get(key, &etcd_value, NULL) == 0) {
- if (strcmp(etcd_value, value) != 0) {
- printf("[ETCDLIB] WARNING: value already exists and is different\n");
- printf(" key = %s\n", key);
- printf(" old value = %s\n", etcd_value);
- printf(" new value = %s\n", value);
- result = -1;
- }
- free(etcd_value);
- }
- if(always_write || !result) {
- result = etcd_set(key, value, ttl, false);
- }
- return result;
-}
-
-
-/**
- * etcd_watch
- */
-int etcd_watch(const char* key, long long index, char** action, char** prevValue, char** value, char** rkey, long long* modifiedIndex) {
- json_error_t error;
- json_t* js_root = NULL;
- json_t* js_node = NULL;
- json_t* js_prevNode = NULL;
- json_t* js_action = NULL;
- json_t* js_value = NULL;
- json_t* js_rkey = NULL;
- json_t* js_prevValue = NULL;
- json_t* js_modIndex = NULL;
- int retVal = -1;
- char *url = NULL;
- int res;
- struct MemoryStruct reply;
-
- reply.memory = malloc(1); /* will be grown as needed by the realloc above */
- reply.size = 0; /* no data at this point */
-
- if (index != 0)
- asprintf(&url, "http://%s:%d/v2/keys/%s?wait=true&recursive=true&waitIndex=%lld", etcd_server, etcd_port, key, index);
- else
- asprintf(&url, "http://%s:%d/v2/keys/%s?wait=true&recursive=true", etcd_server, etcd_port, key);
- res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
- if(url)
- free(url);
- if (res == CURLE_OK) {
- js_root = json_loads(reply.memory, 0, &error);
-
- if (js_root != NULL) {
- js_action = json_object_get(js_root, ETCD_JSON_ACTION);
- js_node = json_object_get(js_root, ETCD_JSON_NODE);
- js_prevNode = json_object_get(js_root, ETCD_JSON_PREVNODE);
- retVal = 0;
- }
- if (js_prevNode != NULL) {
- js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
- }
- if (js_node != NULL) {
- js_rkey = json_object_get(js_node, ETCD_JSON_KEY);
- js_value = json_object_get(js_node, ETCD_JSON_VALUE);
- js_modIndex = json_object_get(js_node, ETCD_JSON_MODIFIEDINDEX);
- }
- if (js_prevNode != NULL) {
- js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
- }
- if ((prevValue != NULL) && (js_prevValue != NULL) && (json_is_string(js_prevValue))) {
-
- *prevValue = strdup(json_string_value(js_prevValue));
- }
- if(modifiedIndex != NULL) {
- if ((js_modIndex != NULL) && (json_is_integer(js_modIndex))) {
- *modifiedIndex = json_integer_value(js_modIndex);
- } else {
- *modifiedIndex = index;
- }
- }
- if ((rkey != NULL) && (js_rkey != NULL) && (json_is_string(js_rkey))) {
- *rkey = strdup(json_string_value(js_rkey));
-
- }
- if ((action != NULL) && (js_action != NULL) && (json_is_string(js_action))) {
- *action = strdup(json_string_value(js_action));
- }
- if ((value != NULL) && (js_value != NULL) && (json_is_string(js_value))) {
- *value = strdup(json_string_value(js_value));
- }
- if (js_root != NULL) {
- json_decref(js_root);
- }
-
- }
-
- if (reply.memory) {
- free(reply.memory);
- }
-
- return retVal;
-}
-
-/**
- * etcd_del
- */
-int etcd_del(const char* key) {
- json_error_t error;
- json_t* js_root = NULL;
- json_t* js_node = NULL;
- int retVal = -1;
- char *url;
- int res;
- struct MemoryStruct reply;
-
- reply.memory = malloc(1); /* will be grown as needed by the realloc above */
- reply.size = 0; /* no data at this point */
-
- asprintf(&url, "http://%s:%d/v2/keys/%s?recursive=true", etcd_server, etcd_port, key);
- res = performRequest(url, DELETE, WriteMemoryCallback, NULL, (void*) &reply);
- free(url);
-
- if (res == CURLE_OK) {
- js_root = json_loads(reply.memory, 0, &error);
- if (js_root != NULL) {
- js_node = json_object_get(js_root, ETCD_JSON_NODE);
- }
-
- if (js_node != NULL) {
- retVal = 0;
- }
-
- if (js_root != NULL) {
- json_decref(js_root);
- }
- }
-
- if (reply.memory) {
- free(reply.memory);
- }
-
- return retVal;
-}
-
-
-static size_t WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp) {
- size_t realsize = size * nmemb;
- struct MemoryStruct *mem = (struct MemoryStruct *) userp;
-
- mem->memory = realloc(mem->memory, mem->size + realsize + 1);
- if (mem->memory == NULL) {
- /* out of memory! */
- printf("not enough memory (realloc returned NULL)\n");
- return 0;
- }
-
- memcpy(&(mem->memory[mem->size]), contents, realsize);
- mem->size += realsize;
- mem->memory[mem->size] = 0;
-
- return realsize;
-}
-
-static int performRequest(char* url, request_t request, void* callback, void* reqData, void* repData) {
- CURL *curl = NULL;
- CURLcode res = 0;
- curl = curl_easy_init();
- curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
- curl_easy_setopt(curl, CURLOPT_TIMEOUT, DEFAULT_CURL_TIMEOUT);
- curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, DEFAULT_CURL_CONECTTIMEOUT);
- curl_easy_setopt(curl, CURLOPT_URL, url);
- curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
- curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, callback);
- curl_easy_setopt(curl, CURLOPT_WRITEDATA, repData);
-
- if (request == PUT) {
- curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");
- curl_easy_setopt(curl, CURLOPT_POST, 1L);
- curl_easy_setopt(curl, CURLOPT_POSTFIELDS, reqData);
- } else if (request == DELETE) {
- curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
- } else if (request == GET) {
- curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "GET");
- }
-
- res = curl_easy_perform(curl);
- curl_easy_cleanup(curl);
- return res;
-}
http://git-wip-us.apache.org/repos/asf/celix/blob/1c42f1d2/celix-pubsub/pubsub/pubsub_discovery/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_discovery/CMakeLists.txt b/celix-pubsub/pubsub/pubsub_discovery/CMakeLists.txt
index ca39b85..43a175d 100644
--- a/celix-pubsub/pubsub/pubsub_discovery/CMakeLists.txt
+++ b/celix-pubsub/pubsub/pubsub_discovery/CMakeLists.txt
@@ -22,6 +22,7 @@ include_directories("${CURL_INCLUDE_DIR}")
include_directories("${JANSSON_INCLUDE_DIR}")
include_directories("${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include")
include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub")
+include_directories("${PROJECT_SOURCE_DIR}/etcdlib/public/include")
include_directories("private/include")
include_directories("public/include")
@@ -35,8 +36,7 @@ add_bundle(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
private/src/etcd_watcher.c
private/src/etcd_writer.c
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
- ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/etcd.c
)
-target_link_libraries(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery celix_framework celix_utils ${CURL_LIBRARIES} ${JANSSON_LIBRARIES})
+target_link_libraries(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery celix_framework celix_utils etcdlib ${CURL_LIBRARIES} ${JANSSON_LIBRARIES})
install_bundle(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery)
http://git-wip-us.apache.org/repos/asf/celix/blob/1c42f1d2/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_common.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_common.c b/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_common.c
index 16102b0..a53a844 100644
--- a/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_common.c
+++ b/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_common.c
@@ -25,13 +25,13 @@
#include "celix_log.h"
#include "constants.h"
+#include <curl/curl.h>
#include "etcd.h"
#include "etcd_watcher.h"
#include "pubsub_discovery.h"
#include "pubsub_discovery_impl.h"
-
#define MAX_ROOTNODE_LENGTH 128
#define MAX_LOCALNODE_LENGTH 4096
#define MAX_FIELD_LENGTH 128
@@ -70,7 +70,7 @@ celix_status_t etcdCommon_init(bundle_context_pt context) {
printf("PSD: Using discovery HOST:PORT: %s:%i\n", etcd_server, etcd_port);
- if (etcd_init(etcd_server, etcd_port) != 0) {
+ if (etcd_init(etcd_server, etcd_port, CURL_GLOBAL_DEFAULT) != 0) {
status = CELIX_BUNDLE_EXCEPTION;
} else {
status = CELIX_SUCCESS;