You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2017/02/06 18:34:19 UTC

[07/19] celix git commit: CELIX-399: Pubsub discovery using etcdlib now

CELIX-399: Pubsub discovery using etcdlib now


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/1c42f1d2
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/1c42f1d2
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/1c42f1d2

Branch: refs/heads/develop
Commit: 1c42f1d285d5e9c1f095fe79b0f109ccc0b0dda9
Parents: 8c84284
Author: Roy Lenferink <le...@gmail.com>
Authored: Mon Feb 6 17:44:55 2017 +0100
Committer: Roy Lenferink <le...@gmail.com>
Committed: Mon Feb 6 17:44:55 2017 +0100

----------------------------------------------------------------------
 .../pubsub/pubsub_common/public/include/etcd.h  |  39 --
 .../pubsub/pubsub_common/public/src/etcd.c      | 474 -------------------
 .../pubsub/pubsub_discovery/CMakeLists.txt      |   4 +-
 .../pubsub_discovery/private/src/etcd_common.c  |   4 +-
 4 files changed, 4 insertions(+), 517 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/1c42f1d2/celix-pubsub/pubsub/pubsub_common/public/include/etcd.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/include/etcd.h b/celix-pubsub/pubsub/pubsub_common/public/include/etcd.h
deleted file mode 100644
index a502df4..0000000
--- a/celix-pubsub/pubsub/pubsub_common/public/include/etcd.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#ifndef ETCD_H_
-#define ETCD_H_
-
-#include <stdbool.h>
-
-typedef void (*etcd_key_value_callback) (const char *key, const char *value, void* arg);
-
-int etcd_init(const char* server, int port);
-
-int etcd_get(const char* key, char** value, int* modifiedIndex);
-int etcd_get_directory(const char* directory, etcd_key_value_callback callback, void *arg, long long* modifiedIndex);
-
-int etcd_set(const char* key, const char* value, int ttl, bool prevExist);
-int etcd_set_with_check(const char* key, const char* value, int ttl, bool always_write);
-
-int etcd_del(const char* key);
-
-int etcd_watch(const char* key, long long index, char** action, char** prevValue, char** value, char** rkey, long long* modifiedIndex);
-
-#endif /* ETCD_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/1c42f1d2/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c b/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c
deleted file mode 100644
index 99ec87a..0000000
--- a/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c
+++ /dev/null
@@ -1,474 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#include <stdio.h>
-#include <stdbool.h>
-#include <string.h>
-#include <curl/curl.h>
-#include <jansson.h>
-#include "etcd.h"
-
-#define ETCD_JSON_NODE                  "node"
-#define ETCD_JSON_PREVNODE              "prevNode"
-#define ETCD_JSON_NODES                 "nodes"
-#define ETCD_JSON_ACTION                "action"
-#define ETCD_JSON_KEY                   "key"
-#define ETCD_JSON_VALUE                 "value"
-#define ETCD_JSON_DIR                   "dir"
-#define ETCD_JSON_MODIFIEDINDEX         "modifiedIndex"
-
-#define MAX_OVERHEAD_LENGTH           64
-#define DEFAULT_CURL_TIMEOUT          10
-#define DEFAULT_CURL_CONECTTIMEOUT    10
-
-typedef enum {
-	GET, PUT, DELETE
-} request_t;
-
-static const char* etcd_server;
-static int etcd_port = 0;
-
-struct MemoryStruct {
-	char *memory;
-	size_t size;
-};
-
-
-/**
- * Static function declarations
- */
-static int performRequest(char* url, request_t request, void* callback, void* reqData, void* repData);
-static size_t WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp);
-/**
- * External function definition
- */
-
-
-/**
- * etcd_init
- */
-int etcd_init(const char* server, int port) {
-        etcd_server = server;
-        etcd_port = port;
-
-        return curl_global_init(CURL_GLOBAL_ALL) != 0;
-}
-
-
-/**
- * etcd_get
- */
-int etcd_get(const char* key, char** value, int* modifiedIndex) {
-    json_t* js_root = NULL;
-    json_t* js_node = NULL;
-    json_t* js_value = NULL;
-    json_t* js_modifiedIndex = NULL;
-    json_error_t error;
-    int res = -1;
-    struct MemoryStruct reply;
-
-    reply.memory = malloc(1); /* will be grown as needed by the realloc above */
-    reply.size = 0; /* no data at this point */
-
-    int retVal = -1;
-    char *url;
-    asprintf(&url, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key);
-    res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
-    free(url);
-    if (res == CURLE_OK) {
-        js_root = json_loads(reply.memory, 0, &error);
-
-        if (js_root != NULL) {
-            js_node = json_object_get(js_root, ETCD_JSON_NODE);
-        }
-        if (js_node != NULL) {
-            js_value = json_object_get(js_node, ETCD_JSON_VALUE);
-            js_modifiedIndex = json_object_get(js_node,
-            ETCD_JSON_MODIFIEDINDEX);
-
-            if (js_modifiedIndex != NULL && js_value != NULL) {
-                if (modifiedIndex) {
-                    *modifiedIndex = json_integer_value(js_modifiedIndex);
-                }
-                *value = strdup(json_string_value(js_value));
-                retVal = 0;
-            }
-        }
-        if (js_root != NULL) {
-            json_decref(js_root);
-        }
-    }
-
-    if (reply.memory) {
-        free(reply.memory);
-    }
-    if(retVal != 0) {
-        value = NULL;
-    }
-    return retVal;
-}
-
-
-static int etcd_get_recursive_values(json_t* js_root, etcd_key_value_callback callback, void *arg, json_int_t *mod_index) {
-    json_t *js_nodes;
-    if ((js_nodes = json_object_get(js_root, ETCD_JSON_NODES)) != NULL) {
-        // subarray
-        if (json_is_array(js_nodes)) {
-            int len = json_array_size(js_nodes);
-            for (int i = 0; i < len; i++) {
-                json_t *js_object = json_array_get(js_nodes, i);
-                json_t *js_mod_index = json_object_get(js_object, ETCD_JSON_MODIFIEDINDEX);
-
-                if(js_mod_index != NULL) {
-                    json_int_t index = json_integer_value(js_mod_index);
-                    if(*mod_index < index) {
-                        *mod_index = index;
-                    }
-                } else {
-                    printf("[ETCDLIB] Error: No INDEX found for key!\n");
-                }
-
-                if (json_object_get(js_object, ETCD_JSON_NODES)) {
-                    // node contains nodes
-                    etcd_get_recursive_values(js_object, callback, arg, mod_index);
-                } else {
-                    json_t* js_key = json_object_get(js_object, ETCD_JSON_KEY);
-                    json_t* js_value = json_object_get(js_object, ETCD_JSON_VALUE);
-
-                    if (js_key && js_value) {
-                        if (!json_object_get(js_object, ETCD_JSON_DIR)) {
-                            callback(json_string_value(js_key), json_string_value(js_value), arg);
-                        }
-                    } //else empty etcd directory, not an error.
-
-                }
-            }
-        } else {
-            printf("[ETCDLIB] Error: misformatted JSON: nodes element is not an array !!\n");
-        }
-    } else {
-        printf("[ETCDLIB] Error: nodes element not found!!\n");
-    }
-
-    return (*index > 0 ? 0 : 1);
-}
-
-/**
- * etcd_get_directory
- */
-int etcd_get_directory(const char* directory, etcd_key_value_callback callback, void* arg, long long* modifiedIndex) {
-    json_t* js_root = NULL;
-    json_t* js_rootnode = NULL;
-
-    json_error_t error;
-    int res;
-    struct MemoryStruct reply;
-
-    reply.memory = malloc(1); /* will be grown as needed by the realloc above */
-    reply.size = 0; /* no data at this point */
-
-    int retVal = 0;
-    char *url;
-
-    asprintf(&url, "http://%s:%d/v2/keys/%s?recursive=true", etcd_server, etcd_port, directory);
-
-    res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
-    free(url);
-
-    if (res == CURLE_OK) {
-        js_root = json_loads(reply.memory, 0, &error);
-        if (js_root != NULL) {
-            js_rootnode = json_object_get(js_root, ETCD_JSON_NODE);
-        } else {
-            retVal = -1;
-            printf("ERROR ETCD: %s in js_root not found", ETCD_JSON_NODE);
-        }
-        if (js_rootnode != NULL) {
-            *modifiedIndex = 0;
-            retVal = etcd_get_recursive_values(js_rootnode, callback, arg, (json_int_t*)modifiedIndex);
-        }
-        if (js_root != NULL) {
-            json_decref(js_root);
-        }
-    }
-
-    if (reply.memory) {
-        free(reply.memory);
-    }
-
-    return retVal;
-}
-
-/**
- * etcd_set
- */
-int etcd_set(const char* key, const char* value, int ttl, bool prevExist) {
-    json_error_t error;
-    json_t* js_root = NULL;
-    json_t* js_node = NULL;
-    json_t* js_value = NULL;
-    int retVal = -1;
-    char *url;
-    size_t req_len = strlen(value) + MAX_OVERHEAD_LENGTH;
-    char request[req_len];
-    char* requestPtr = request;
-    int res;
-    struct MemoryStruct reply;
-
-    /* Skip leading '/', etcd cannot handle this. */
-    while(*key == '/') {
-        key++;
-    }
-
-    reply.memory = calloc(1, 1); /* will be grown as needed by the realloc above */
-    reply.size = 0; /* no data at this point */
-
-    asprintf(&url, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key);
-
-    requestPtr += snprintf(requestPtr, req_len, "value=%s", value);
-    if (ttl > 0) {
-        requestPtr += snprintf(requestPtr, req_len-(requestPtr-request), ";ttl=%d", ttl);
-    }
-
-    if (prevExist) {
-        requestPtr += snprintf(requestPtr, req_len-(requestPtr-request), ";prevExist=true");
-    }
-    res = performRequest(url, PUT, WriteMemoryCallback, request, (void*) &reply);
-    if(url) {
-        free(url);
-    }
-
-    if (res == CURLE_OK) {
-        js_root = json_loads(reply.memory, 0, &error);
-
-        if (js_root != NULL) {
-            js_node = json_object_get(js_root, ETCD_JSON_NODE);
-        }
-        if (js_node != NULL) {
-            js_value = json_object_get(js_node, ETCD_JSON_VALUE);
-        }
-        if (js_value != NULL && json_is_string(js_value)) {
-            if(strcmp(json_string_value(js_value), value) == 0) {
-                retVal = 0;
-            }
-        }
-        if (js_root != NULL) {
-            json_decref(js_root);
-        }
-    }
-
-    if (reply.memory) {
-        free(reply.memory);
-    }
-
-    return retVal;
-}
-
-
-/**
- * etcd_set_with_check
- */
-int etcd_set_with_check(const char* key, const char* value, int ttl, bool always_write) {
-    char *etcd_value;
-    int result = 0;
-    if (etcd_get(key, &etcd_value, NULL) == 0) {
-        if (strcmp(etcd_value, value) != 0) {
-            printf("[ETCDLIB] WARNING: value already exists and is different\n");
-            printf("   key       = %s\n", key);
-            printf("   old value = %s\n", etcd_value);
-            printf("   new value = %s\n", value);
-            result = -1;
-        }
-		free(etcd_value);
-    }
-    if(always_write || !result) {
-        result = etcd_set(key, value, ttl, false);
-    }
-    return result;
-}
-
-
-/**
- * etcd_watch
- */
-int etcd_watch(const char* key, long long index, char** action, char** prevValue, char** value, char** rkey, long long* modifiedIndex) {
-    json_error_t error;
-    json_t* js_root = NULL;
-    json_t* js_node = NULL;
-    json_t* js_prevNode = NULL;
-    json_t* js_action = NULL;
-    json_t* js_value = NULL;
-    json_t* js_rkey = NULL;
-    json_t* js_prevValue = NULL;
-    json_t* js_modIndex = NULL;
-    int retVal = -1;
-    char *url = NULL;
-    int res;
-    struct MemoryStruct reply;
-
-    reply.memory = malloc(1); /* will be grown as needed by the realloc above */
-    reply.size = 0; /* no data at this point */
-
-    if (index != 0)
-        asprintf(&url, "http://%s:%d/v2/keys/%s?wait=true&recursive=true&waitIndex=%lld", etcd_server, etcd_port, key, index);
-    else
-        asprintf(&url, "http://%s:%d/v2/keys/%s?wait=true&recursive=true", etcd_server, etcd_port, key);
-    res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
-    if(url)
-        free(url);
-    if (res == CURLE_OK) {
-        js_root = json_loads(reply.memory, 0, &error);
-
-        if (js_root != NULL) {
-            js_action = json_object_get(js_root, ETCD_JSON_ACTION);
-            js_node = json_object_get(js_root, ETCD_JSON_NODE);
-            js_prevNode = json_object_get(js_root, ETCD_JSON_PREVNODE);
-            retVal = 0;
-        }
-        if (js_prevNode != NULL) {
-            js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
-        }
-        if (js_node != NULL) {
-            js_rkey = json_object_get(js_node, ETCD_JSON_KEY);
-            js_value = json_object_get(js_node, ETCD_JSON_VALUE);
-            js_modIndex = json_object_get(js_node, ETCD_JSON_MODIFIEDINDEX);
-        }
-        if (js_prevNode != NULL) {
-            js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
-        }
-        if ((prevValue != NULL) && (js_prevValue != NULL) && (json_is_string(js_prevValue))) {
-
-            *prevValue = strdup(json_string_value(js_prevValue));
-        }
-        if(modifiedIndex != NULL) {
-            if ((js_modIndex != NULL) && (json_is_integer(js_modIndex))) {
-                *modifiedIndex = json_integer_value(js_modIndex);
-            } else {
-                *modifiedIndex = index;
-            }
-        }
-        if ((rkey != NULL) && (js_rkey != NULL) && (json_is_string(js_rkey))) {
-            *rkey = strdup(json_string_value(js_rkey));
-
-        }
-        if ((action != NULL)  && (js_action != NULL)  && (json_is_string(js_action))) {
-            *action = strdup(json_string_value(js_action));
-        }
-        if ((value != NULL) && (js_value != NULL) && (json_is_string(js_value))) {
-            *value = strdup(json_string_value(js_value));
-        }
-        if (js_root != NULL) {
-            json_decref(js_root);
-        }
-
-    }
-
-    if (reply.memory) {
-        free(reply.memory);
-    }
-
-    return retVal;
-}
-
-/**
- * etcd_del
- */
-int etcd_del(const char* key) {
-    json_error_t error;
-    json_t* js_root = NULL;
-    json_t* js_node = NULL;
-    int retVal = -1;
-    char *url;
-    int res;
-    struct MemoryStruct reply;
-
-    reply.memory = malloc(1); /* will be grown as needed by the realloc above */
-    reply.size = 0; /* no data at this point */
-
-    asprintf(&url, "http://%s:%d/v2/keys/%s?recursive=true", etcd_server, etcd_port, key);
-    res = performRequest(url, DELETE, WriteMemoryCallback, NULL, (void*) &reply);
-    free(url);
-
-    if (res == CURLE_OK) {
-        js_root = json_loads(reply.memory, 0, &error);
-        if (js_root != NULL) {
-            js_node = json_object_get(js_root, ETCD_JSON_NODE);
-        }
-
-        if (js_node != NULL) {
-            retVal = 0;
-        }
-
-        if (js_root != NULL) {
-            json_decref(js_root);
-        }
-    }
-
-    if (reply.memory) {
-        free(reply.memory);
-    }
-
-    return retVal;
-}
-
-
-static size_t WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp) {
-    size_t realsize = size * nmemb;
-    struct MemoryStruct *mem = (struct MemoryStruct *) userp;
-
-    mem->memory = realloc(mem->memory, mem->size + realsize + 1);
-    if (mem->memory == NULL) {
-        /* out of memory! */
-        printf("not enough memory (realloc returned NULL)\n");
-        return 0;
-    }
-
-    memcpy(&(mem->memory[mem->size]), contents, realsize);
-    mem->size += realsize;
-    mem->memory[mem->size] = 0;
-
-    return realsize;
-}
-
-static int performRequest(char* url, request_t request, void* callback, void* reqData, void* repData) {
-    CURL *curl = NULL;
-    CURLcode res = 0;
-    curl = curl_easy_init();
-    curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
-    curl_easy_setopt(curl, CURLOPT_TIMEOUT, DEFAULT_CURL_TIMEOUT);
-    curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, DEFAULT_CURL_CONECTTIMEOUT);
-    curl_easy_setopt(curl, CURLOPT_URL, url);
-    curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
-    curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, callback);
-    curl_easy_setopt(curl, CURLOPT_WRITEDATA, repData);
-
-    if (request == PUT) {
-        curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");
-        curl_easy_setopt(curl, CURLOPT_POST, 1L);
-        curl_easy_setopt(curl, CURLOPT_POSTFIELDS, reqData);
-    } else if (request == DELETE) {
-        curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
-    } else if (request == GET) {
-        curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "GET");
-    }
-
-    res = curl_easy_perform(curl);
-    curl_easy_cleanup(curl);
-    return res;
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/1c42f1d2/celix-pubsub/pubsub/pubsub_discovery/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_discovery/CMakeLists.txt b/celix-pubsub/pubsub/pubsub_discovery/CMakeLists.txt
index ca39b85..43a175d 100644
--- a/celix-pubsub/pubsub/pubsub_discovery/CMakeLists.txt
+++ b/celix-pubsub/pubsub/pubsub_discovery/CMakeLists.txt
@@ -22,6 +22,7 @@ include_directories("${CURL_INCLUDE_DIR}")
 include_directories("${JANSSON_INCLUDE_DIR}")
 include_directories("${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include")
 include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub")
+include_directories("${PROJECT_SOURCE_DIR}/etcdlib/public/include")
 include_directories("private/include")
 include_directories("public/include")
 
@@ -35,8 +36,7 @@ add_bundle(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
 		private/src/etcd_watcher.c
 		private/src/etcd_writer.c
 		${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
-		${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/etcd.c
 )
 
-target_link_libraries(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery celix_framework celix_utils ${CURL_LIBRARIES} ${JANSSON_LIBRARIES})
+target_link_libraries(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery celix_framework celix_utils etcdlib ${CURL_LIBRARIES} ${JANSSON_LIBRARIES})
 install_bundle(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery)

http://git-wip-us.apache.org/repos/asf/celix/blob/1c42f1d2/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_common.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_common.c b/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_common.c
index 16102b0..a53a844 100644
--- a/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_common.c
+++ b/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_common.c
@@ -25,13 +25,13 @@
 #include "celix_log.h"
 #include "constants.h"
 
+#include <curl/curl.h>
 #include "etcd.h"
 #include "etcd_watcher.h"
 
 #include "pubsub_discovery.h"
 #include "pubsub_discovery_impl.h"
 
-
 #define MAX_ROOTNODE_LENGTH		128
 #define MAX_LOCALNODE_LENGTH 	4096
 #define MAX_FIELD_LENGTH		128
@@ -70,7 +70,7 @@ celix_status_t etcdCommon_init(bundle_context_pt context) {
 
     printf("PSD: Using discovery HOST:PORT: %s:%i\n", etcd_server, etcd_port);
 
-    if (etcd_init(etcd_server, etcd_port) != 0) {
+    if (etcd_init(etcd_server, etcd_port, CURL_GLOBAL_DEFAULT) != 0) {
         status = CELIX_BUNDLE_EXCEPTION;
     } else {
         status = CELIX_SUCCESS;