You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2019/12/09 19:56:44 UTC
[celix] 01/02: Improves debug logging for etcdlib,
pubsub etcd disc & json serializer
This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git
commit e0016f047457824ff7ffca82ac688c05f23607a4
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Mon Dec 9 20:24:56 2019 +0100
Improves debug logging for etcdlib, pubsub etcd disc & json serializer
---
.../pubsub_discovery/src/pubsub_discovery_impl.c | 29 ++--
.../pubsub_discovery/src/pubsub_discovery_impl.h | 1 +
.../src/pubsub_serializer_impl.c | 150 +++++++++++----------
libs/etcdlib/api/etcdlib.h | 10 ++
libs/etcdlib/src/etcd.c | 18 ++-
5 files changed, 124 insertions(+), 84 deletions(-)
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index 3eefff1..7e847c5 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -83,7 +83,7 @@ pubsub_discovery_t* pubsub_discovery_create(celix_bundle_context_t *context, log
long etcdPort = celix_bundleContext_getPropertyAsLong(context, PUBSUB_DISCOVERY_SERVER_PORT_KEY, PUBSUB_DISCOVERY_SERVER_PORT_DEFAULT);
long ttl = celix_bundleContext_getPropertyAsLong(context, PUBSUB_DISCOVERY_ETCD_TTL_KEY, PUBSUB_DISCOVERY_ETCD_TTL_DEFAULT);
- etcd_init(etcdIp, (int)etcdPort, ETCDLIB_NO_CURL_INITIALIZATION);
+ disc->etcdlib = etcdlib_create(etcdIp, etcdPort, ETCDLIB_NO_CURL_INITIALIZATION);
disc->ttlForEntries = (int)ttl;
disc->sleepInsecBetweenTTLRefresh = (int)(((float)ttl)/2.0);
disc->pubsubPath = celix_bundleContext_getProperty(context, PUBSUB_DISCOVERY_SERVER_PATH_KEY, PUBSUB_DISCOVERY_SERVER_PATH_DEFAULT);
@@ -122,6 +122,11 @@ celix_status_t pubsub_discovery_destroy(pubsub_discovery_t *ps_discovery) {
celixThreadMutex_destroy(&ps_discovery->runningMutex);
+ if (ps_discovery->etcdlib != NULL) {
+ etcdlib_destroy(ps_discovery->etcdlib);
+ ps_discovery->etcdlib = NULL;
+ }
+
free(ps_discovery);
return status;
@@ -142,7 +147,7 @@ static void psd_watchSetupConnection(pubsub_discovery_t *disc, bool *connectedPt
if (disc->verbose) {
printf("[PSD] Reading etcd directory at %s\n", disc->pubsubPath);
}
- int rc = etcd_get_directory(disc->pubsubPath, psd_etcdReadCallback, disc, mIndex);
+ int rc = etcdlib_get_directory(disc->etcdlib, disc->pubsubPath, psd_etcdReadCallback, disc, mIndex);
if (rc == ETCDLIB_RC_OK) {
*connectedPtr = true;
} else {
@@ -159,8 +164,8 @@ static void psd_watchForChange(pubsub_discovery_t *disc, bool *connectedPtr, lon
char *action = NULL;
char *value = NULL;
char *readKey = NULL;
- //TODO add interruptable etcd_wait -> which returns a handle to interrupt and a can be used for a wait call
- int rc = etcd_watch(disc->pubsubPath, watchIndex, &action, NULL, &value, &readKey, mIndex);
+ //TODO add interruptable etcdlib_wait -> which returns a handle to interrupt and a can be used for a wait call
+ int rc = etcdlib_watch(disc->etcdlib, disc->pubsubPath, watchIndex, &action, NULL, &value, &readKey, mIndex);
if (rc == ETCDLIB_RC_ERROR) {
L_ERROR("[PSD] Communicating with etcd. rc is %i, action value is %s\n", rc, action);
*connectedPtr = false;
@@ -269,7 +274,7 @@ void* psd_refresh(void *data) {
pubsub_announce_entry_t *entry = hashMapIterator_nextValue(&iter);
if (entry->isSet) {
//only refresh ttl -> no index update -> no watch trigger
- int rc = etcd_refresh(entry->key, disc->ttlForEntries);
+ int rc = etcdlib_refresh(disc->etcdlib, entry->key, disc->ttlForEntries);
if (rc != ETCDLIB_RC_OK) {
L_WARN("[PSD] Warning: Cannot refresh etcd key %s\n", entry->key);
entry->isSet = false;
@@ -279,7 +284,7 @@ void* psd_refresh(void *data) {
}
} else {
char *str = pubsub_discovery_createJsonEndpoint(entry->properties);
- int rc = etcd_set(entry->key, str, disc->ttlForEntries, false);
+ int rc = etcdlib_set(disc->etcdlib, entry->key, str, disc->ttlForEntries, false);
if (rc == ETCDLIB_RC_OK) {
entry->isSet = true;
entry->setCount += 1;
@@ -353,7 +358,7 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_t *disc) {
while (hashMapIterator_hasNext(&iter)) {
pubsub_announce_entry_t *entry = hashMapIterator_nextValue(&iter);
if (entry->isSet) {
- etcd_del(entry->key);
+ etcdlib_del(disc->etcdlib, entry->key);
}
free(entry->key);
celix_properties_destroy(entry->properties);
@@ -446,7 +451,7 @@ celix_status_t pubsub_discovery_revokeEndpoint(void *handle, const celix_propert
if (entry != NULL) {
if (entry->isSet) {
- etcd_del(entry->key);
+ etcdlib_del(disc->etcdlib, entry->key);
}
free(entry->key);
celix_properties_destroy(entry->properties);
@@ -584,6 +589,14 @@ celix_status_t pubsub_discovery_executeCommand(void *handle, char * commandLine
//TODO add support for query (scope / topic)
fprintf(os, "\n");
+ fprintf(os, "Discovery configuration:\n");
+ fprintf(os, " |- etcd host = %s\n", etcdlib_host(disc->etcdlib));
+ fprintf(os, " |- etcd port = %i\n", etcdlib_port(disc->etcdlib));
+ fprintf(os, " |- entries ttl = %i seconds\n", disc->ttlForEntries);
+ fprintf(os, " |- entries refresh time = %i seconds\n", disc->sleepInsecBetweenTTLRefresh);
+ fprintf(os, " |- pubsub discovery path = %s\n", disc->pubsubPath);
+
+ fprintf(os, "\n");
fprintf(os, "Discovered Endpoints:\n");
celixThreadMutex_lock(&disc->discoveredEndpointsMutex);
hash_map_iterator_t iter = hashMapIterator_construct(disc->discoveredEndpoints);
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
index 7052a19..6cc7151 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
@@ -70,6 +70,7 @@ typedef struct pubsub_discovery {
//configurable by config/env.
const char *pubsubPath;
bool verbose;
+ etcdlib_t *etcdlib;
int ttlForEntries;
int sleepInsecBetweenTTLRefresh;
const char *fwUUID;
diff --git a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
index 7810607..112d6e7 100644
--- a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
+++ b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
@@ -47,17 +47,27 @@ FILE_INPUT_TYPE;
struct pubsub_json_serializer {
celix_bundle_context_t *bundle_context;
- log_helper_t *loghelper;
+ log_helper_t *log;
};
+#define L_DEBUG(...) \
+ logHelper_log(serializer->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+ logHelper_log(serializer->log, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+ logHelper_log(serializer->log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+ logHelper_log(serializer->log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
+
/* Start of serializer specific functions */
static celix_status_t pubsubMsgSerializer_serialize(void* handle, const void* msg, void** out, size_t *outLen);
static celix_status_t pubsubMsgSerializer_deserialize(void* handle, const void* input, size_t inputLen, void **out);
static void pubsubMsgSerializer_freeMsg(void* handle, void *msg);
-static FILE* openFileStream(FILE_INPUT_TYPE file_input_type, const char* filename, const char* root, /*output*/ char* avpr_fqn, /*output*/ char* path);
+static FILE* openFileStream(pubsub_json_serializer_t* serializer, FILE_INPUT_TYPE file_input_type, const char* filename, const char* root, /*output*/ char* avpr_fqn, /*output*/ char* path);
static FILE_INPUT_TYPE getFileInputType(const char* filename);
-static bool readPropertiesFile(const char* properties_file_name, const char* root, /*output*/ char* avpr_fqn, /*output*/ char* path);
+static bool readPropertiesFile(pubsub_json_serializer_t* serializer, const char* properties_file_name, const char* root, /*output*/ char* avpr_fqn, /*output*/ char* path);
typedef struct pubsub_json_msg_serializer_impl {
dyn_message_type *msgType;
@@ -68,11 +78,11 @@ typedef struct pubsub_json_msg_serializer_impl {
} pubsub_json_msg_serializer_impl_t;
static char* pubsubSerializer_getMsgDescriptionDir(celix_bundle_t *bundle);
-static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, celix_bundle_t *bundle, hash_map_pt msgSerializers);
-static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgSerializers,celix_bundle_t *bundle);
+static void pubsubSerializer_addMsgSerializerFromBundle(pubsub_json_serializer_t* serializer, const char *root, celix_bundle_t *bundle, hash_map_pt msgSerializers);
+static void pubsubSerializer_fillMsgSerializerMap(pubsub_json_serializer_t* serializer, hash_map_pt msgSerializers,celix_bundle_t *bundle);
-static int pubsubMsgSerializer_convertDescriptor(FILE* file_ptr, pubsub_msg_serializer_t* serializer);
-static int pubsubMsgSerializer_convertAvpr(FILE* file_ptr, pubsub_msg_serializer_t* serializer, const char* fqn);
+static int pubsubMsgSerializer_convertDescriptor(pubsub_json_serializer_t* serializer, FILE* file_ptr, pubsub_msg_serializer_t* msgSerializer);
+static int pubsubMsgSerializer_convertAvpr(pubsub_json_serializer_t *serializer, FILE* file_ptr, pubsub_msg_serializer_t* msgSerializer, const char* fqn);
static void dfi_log(void *handle, int level, const char *file, int line, const char *msg, ...) {
va_list ap;
@@ -81,7 +91,7 @@ static void dfi_log(void *handle, int level, const char *file, int line, const c
va_start(ap, msg);
vasprintf(&logStr, msg, ap);
va_end(ap);
- logHelper_log(serializer->loghelper, level, "FILE:%s, LINE:%i, MSG:%s", file, line, logStr);
+ logHelper_log(serializer->log, level, "FILE:%s, LINE:%i, MSG:%s", file, line, logStr);
free(logStr);
}
@@ -97,8 +107,8 @@ celix_status_t pubsubSerializer_create(celix_bundle_context_t *context, pubsub_j
(*serializer)->bundle_context= context;
- if (logHelper_create(context, &(*serializer)->loghelper) == CELIX_SUCCESS) {
- logHelper_start((*serializer)->loghelper);
+ if (logHelper_create(context, &(*serializer)->log) == CELIX_SUCCESS) {
+ logHelper_start((*serializer)->log);
jsonSerializer_logSetup(dfi_log, (*serializer), 1);
dynFunction_logSetup(dfi_log, (*serializer), 1);
dynType_logSetup(dfi_log, (*serializer), 1);
@@ -113,8 +123,8 @@ celix_status_t pubsubSerializer_create(celix_bundle_context_t *context, pubsub_j
celix_status_t pubsubSerializer_destroy(pubsub_json_serializer_t* serializer) {
celix_status_t status = CELIX_SUCCESS;
- logHelper_stop(serializer->loghelper);
- logHelper_destroy(&serializer->loghelper);
+ logHelper_stop(serializer->log);
+ logHelper_destroy(&serializer->log);
free(serializer);
@@ -122,22 +132,12 @@ celix_status_t pubsubSerializer_destroy(pubsub_json_serializer_t* serializer) {
}
celix_status_t pubsubSerializer_createSerializerMap(void *handle, celix_bundle_t *bundle, hash_map_pt* serializerMap) {
- celix_status_t status = CELIX_SUCCESS;
pubsub_json_serializer_t *serializer = handle;
hash_map_pt map = hashMap_create(NULL, NULL, NULL, NULL);
-
- if (map != NULL) {
- pubsubSerializer_fillMsgSerializerMap(map, bundle);
- } else {
- logHelper_log(serializer->loghelper, OSGI_LOGSERVICE_ERROR, "Cannot allocate memory for msg map");
- status = CELIX_ENOMEM;
- }
-
- if (status == CELIX_SUCCESS) {
- *serializerMap = map;
- }
- return status;
+ pubsubSerializer_fillMsgSerializerMap(serializer, map, bundle);
+ *serializerMap = map;
+ return CELIX_SUCCESS;
}
celix_status_t pubsubSerializer_destroySerializerMap(void* handle __attribute__((unused)), hash_map_pt serializerMap) {
@@ -210,7 +210,7 @@ void pubsubMsgSerializer_freeMsg(void* handle, void *msg) {
}
-static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgSerializers, celix_bundle_t *bundle) {
+static void pubsubSerializer_fillMsgSerializerMap(pubsub_json_serializer_t* serializer, hash_map_pt msgSerializers, celix_bundle_t *bundle) {
char* root = NULL;
char* metaInfPath = NULL;
@@ -219,8 +219,8 @@ static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgSerializers, ce
if (root != NULL) {
asprintf(&metaInfPath, "%s/META-INF/descriptors", root);
- pubsubSerializer_addMsgSerializerFromBundle(root, bundle, msgSerializers);
- pubsubSerializer_addMsgSerializerFromBundle(metaInfPath, bundle, msgSerializers);
+ pubsubSerializer_addMsgSerializerFromBundle(serializer, root, bundle, msgSerializers);
+ pubsubSerializer_addMsgSerializerFromBundle(serializer, metaInfPath, bundle, msgSerializers);
free(metaInfPath);
free(root);
@@ -253,12 +253,11 @@ static char* pubsubSerializer_getMsgDescriptionDir(celix_bundle_t *bundle) {
return root;
}
-static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, celix_bundle_t *bundle, hash_map_pt msgSerializers) {
+static void pubsubSerializer_addMsgSerializerFromBundle(pubsub_json_serializer_t* serializer, const char *root, celix_bundle_t *bundle, hash_map_pt msgSerializers) {
char fqn[MAX_PATH_LEN];
- char path[MAX_PATH_LEN];
+ char pathOrError[MAX_PATH_LEN];
const char* entry_name = NULL;
FILE_INPUT_TYPE fileInputType;
- FILE* stream = NULL;
const struct dirent *entry = NULL;
DIR* dir = opendir(root);
@@ -267,12 +266,18 @@ static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, celix_
}
for (; entry != NULL; entry = readdir(dir)) {
+ FILE* stream = NULL;
entry_name = entry->d_name;
- printf("DMU: Parsing entry '%s'\n", entry_name);
fileInputType = getFileInputType(entry_name);
- stream = openFileStream(fileInputType, entry_name, root, /*out*/fqn, /*out*/path);
+ if (fileInputType != FIT_INVALID) {
+ L_DEBUG("[json serializer] Parsing entry '%s'\n", entry_name);
+ stream = openFileStream(serializer, fileInputType, entry_name, root, /*out*/fqn, /*out*/pathOrError);
+ if (!stream) {
+ L_WARN("[json serializer] Cannot open descriptor file: '%s'\n", pathOrError);
+ }
+ }
+
if (!stream) {
- printf("DMU: Cannot open descriptor file: '%s'.\n", path);
continue; // Go to next entry in directory
}
@@ -282,15 +287,15 @@ static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, celix_
int translation_result = -1;
if (fileInputType == FIT_DESCRIPTOR) {
- translation_result = pubsubMsgSerializer_convertDescriptor(stream, msgSerializer);
+ translation_result = pubsubMsgSerializer_convertDescriptor(serializer, stream, msgSerializer);
}
else if (fileInputType == FIT_AVPR) {
- translation_result = pubsubMsgSerializer_convertAvpr(stream, msgSerializer, fqn);
+ translation_result = pubsubMsgSerializer_convertAvpr(serializer, stream, msgSerializer, fqn);
}
fclose(stream);
if (translation_result != 0) {
- printf("DMU: could not create serializer for '%s'\n", entry_name);
+ L_WARN("[json serializer] Could not craete serializer for '%s'\n", entry_name);
free(impl);
free(msgSerializer);
continue;
@@ -298,12 +303,12 @@ static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, celix_
// serializer has been constructed, try to put in the map
if (hashMap_containsKey(msgSerializers, (void *) (uintptr_t) msgSerializer->msgId)) {
- printf("Cannot add msg %s. clash in msg id %d!!\n", msgSerializer->msgName, msgSerializer->msgId);
+ L_WARN("Cannot add msg %s. Clash is msg id %d!\n", msgSerializer->msgName, msgSerializer->msgId);
dynMessage_destroy(impl->msgType);
free(msgSerializer);
free(impl);
} else if (msgSerializer->msgId == 0) {
- printf("Cannot add msg %s. clash in msg id %d!!\n", msgSerializer->msgName, msgSerializer->msgId);
+ L_WARN("Cannot add msg %s. Clash is msg id %d!\n", msgSerializer->msgName, msgSerializer->msgId);
dynMessage_destroy(impl->msgType);
free(msgSerializer);
free(impl);
@@ -318,27 +323,24 @@ static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, celix_
}
}
-static FILE* openFileStream(FILE_INPUT_TYPE file_input_type, const char* filename, const char* root, char* avpr_fqn, char* path) {
+static FILE* openFileStream(pubsub_json_serializer_t *serializer, FILE_INPUT_TYPE file_input_type, const char* filename, const char* root, char* avpr_fqn, char* pathOrError) {
FILE* result = NULL;
- memset(path, 0, MAX_PATH_LEN);
+ memset(pathOrError, 0, MAX_PATH_LEN);
switch (file_input_type) {
case FIT_INVALID:
- snprintf(path, MAX_PATH_LEN, "Because %s is not a valid file", filename);
+ snprintf(pathOrError, MAX_PATH_LEN, "Because %s is not a valid file", filename);
break;
-
case FIT_DESCRIPTOR:
- snprintf(path, MAX_PATH_LEN, "%s/%s", root, filename);
- result = fopen(path, "r");
+ snprintf(pathOrError, MAX_PATH_LEN, "%s/%s", root, filename);
+ result = fopen(pathOrError, "r");
break;
-
case FIT_AVPR:
- if (readPropertiesFile(filename, root, avpr_fqn, path)) {
- result = fopen(path, "r");
+ if (readPropertiesFile(serializer, filename, root, avpr_fqn, pathOrError)) {
+ result = fopen(pathOrError, "r");
}
break;
-
default:
- printf("DMU: Unknown file input type, returning NULL!\n");
+ L_WARN("[json serializer] Unknown file input type, returning NULL!\n");
break;
}
@@ -357,11 +359,11 @@ static FILE_INPUT_TYPE getFileInputType(const char* filename) {
}
}
-static bool readPropertiesFile(const char* properties_file_name, const char* root, char* avpr_fqn, char* path) {
+static bool readPropertiesFile(pubsub_json_serializer_t* serializer, const char* properties_file_name, const char* root, char* avpr_fqn, char* path) {
snprintf(path, MAX_PATH_LEN, "%s/%s", root, properties_file_name); // use path to create path to properties file
FILE *properties = fopen(path, "r");
if (!properties) {
- printf("DMU: Could not find or open %s as a properties file in %s\n", properties_file_name, root);
+ L_WARN("[json serializer] Could not find or open %s as a properties file in %s\n", properties_file_name, root);
return false;
}
@@ -383,23 +385,23 @@ static bool readPropertiesFile(const char* properties_file_name, const char* roo
fclose(properties);
if (*avpr_fqn == '\0') {
- printf("CMU: File %s does not contain a fully qualified name for the parser\n", properties_file_name);
+ L_WARN("[json serializer] File %s does not contain a fully qualified name for the parser\n", properties_file_name);
return false;
}
if (*path == '\0') {
- printf("CMU: File %s does not contain a location for the avpr file\n", properties_file_name);
+ L_WARN("[json serializer] File %s does not contain a location for the avpr file\n", properties_file_name);
return false;
}
return true;
}
-static int pubsubMsgSerializer_convertDescriptor(FILE* file_ptr, pubsub_msg_serializer_t* serializer) {
+static int pubsubMsgSerializer_convertDescriptor(pubsub_json_serializer_t* serializer, FILE* file_ptr, pubsub_msg_serializer_t* msgSerializer) {
dyn_message_type *msgType = NULL;
int rc = dynMessage_parse(file_ptr, &msgType);
if (rc != 0 || msgType == NULL) {
- printf("DMU: cannot parse message from descriptor.\n");
+ L_WARN("[json serializer] Cannot parse message from descriptor.\n");
return -1;
}
@@ -410,7 +412,7 @@ static int pubsubMsgSerializer_convertDescriptor(FILE* file_ptr, pubsub_msg_seri
rc += dynMessage_getVersion(msgType, &msgVersion);
if (rc != 0 || msgName == NULL || msgVersion == NULL) {
- printf("DMU: cannot retrieve name and/or version from msg\n");
+ L_WARN("[json serializer] Cannot retrieve name and/or version from msg\n");
return -1;
}
@@ -432,29 +434,29 @@ static int pubsubMsgSerializer_convertDescriptor(FILE* file_ptr, pubsub_msg_seri
msgId = utils_stringHash(msgName);
}
- pubsub_json_msg_serializer_impl_t *handle = (pubsub_json_msg_serializer_impl_t*)serializer->handle;
+ pubsub_json_msg_serializer_impl_t *handle = (pubsub_json_msg_serializer_impl_t*)msgSerializer->handle;
handle->msgType = msgType;
handle->msgId = msgId;
handle->msgName = msgName;
handle->msgVersion = msgVersion;
- serializer->msgId = handle->msgId;
- serializer->msgName = handle->msgName;
- serializer->msgVersion = handle->msgVersion;
+ msgSerializer->msgId = handle->msgId;
+ msgSerializer->msgName = handle->msgName;
+ msgSerializer->msgVersion = handle->msgVersion;
- serializer->serialize = (void*) pubsubMsgSerializer_serialize;
- serializer->deserialize = (void*) pubsubMsgSerializer_deserialize;
- serializer->freeMsg = (void*) pubsubMsgSerializer_freeMsg;
+ msgSerializer->serialize = (void*) pubsubMsgSerializer_serialize;
+ msgSerializer->deserialize = (void*) pubsubMsgSerializer_deserialize;
+ msgSerializer->freeMsg = (void*) pubsubMsgSerializer_freeMsg;
return 0;
}
-static int pubsubMsgSerializer_convertAvpr(FILE* file_ptr, pubsub_msg_serializer_t* serializer, const char* fqn) {
+static int pubsubMsgSerializer_convertAvpr(pubsub_json_serializer_t *serializer, FILE* file_ptr, pubsub_msg_serializer_t* msgSerializer, const char* fqn) {
if (!file_ptr || !fqn || !serializer) return -2;
dyn_message_type* msgType = dynMessage_parseAvpr(file_ptr, fqn);
if (!msgType) {
- printf("DMU: cannot parse avpr file for '%s'\n", fqn);
+ L_WARN("[json serializer] Cannot parse avpr file '%s'\n", fqn);
return -1;
}
@@ -467,7 +469,7 @@ static int pubsubMsgSerializer_convertAvpr(FILE* file_ptr, pubsub_msg_serializer
celix_status_t s = version_createVersionFromString(dynType_getMetaInfo(type, "version"), &msgVersion);
if (s != CELIX_SUCCESS || !msgName) {
- printf("DMU: cannot retrieve name and/or version from msg\n");
+ L_WARN("[json serializer] Cannot retrieve name and/or version from msg\n");
if (s == CELIX_SUCCESS) {
version_destroy(msgVersion);
}
@@ -487,19 +489,19 @@ static int pubsubMsgSerializer_convertAvpr(FILE* file_ptr, pubsub_msg_serializer
msgId = utils_stringHash(msgName);
}
- pubsub_json_msg_serializer_impl_t *handle = (pubsub_json_msg_serializer_impl_t*) serializer->handle;
+ pubsub_json_msg_serializer_impl_t *handle = (pubsub_json_msg_serializer_impl_t*) msgSerializer->handle;
handle->msgType = msgType;
handle->msgId = msgId;
handle->msgName = msgName;
handle->msgVersion = msgVersion;
- serializer->msgId = handle->msgId;
- serializer->msgName = handle->msgName;
- serializer->msgVersion = handle->msgVersion;
+ msgSerializer->msgId = handle->msgId;
+ msgSerializer->msgName = handle->msgName;
+ msgSerializer->msgVersion = handle->msgVersion;
- serializer->serialize = (void*) pubsubMsgSerializer_serialize;
- serializer->deserialize = (void*) pubsubMsgSerializer_deserialize;
- serializer->freeMsg = (void*) pubsubMsgSerializer_freeMsg;
+ msgSerializer->serialize = (void*) pubsubMsgSerializer_serialize;
+ msgSerializer->deserialize = (void*) pubsubMsgSerializer_deserialize;
+ msgSerializer->freeMsg = (void*) pubsubMsgSerializer_freeMsg;
return 0;
}
diff --git a/libs/etcdlib/api/etcdlib.h b/libs/etcdlib/api/etcdlib.h
index e1e6c7d..b3b70aa 100644
--- a/libs/etcdlib/api/etcdlib.h
+++ b/libs/etcdlib/api/etcdlib.h
@@ -66,6 +66,16 @@ etcdlib_t* etcdlib_create(const char* server, int port, int flags);
void etcdlib_destroy(etcdlib_t *etcdlib);
/**
+ * Returns the configured etcd host for etcdlib.
+ */
+const char* etcdlib_host(etcdlib_t *etcdlib);
+
+/**
+ * Returns the configured etcd port for etcdlib.
+ */
+int etcdlib_port(etcdlib_t *etcdlib);
+
+/**
* @desc Retrieve a single value from Etcd.
* @param const etcdlib_t* etcdlib. The ETCD-LIB instance (contains hostname and port info).
* @param const char* key. The Etcd-key (Note: a leading '/' should be avoided).
diff --git a/libs/etcdlib/src/etcd.c b/libs/etcdlib/src/etcd.c
index 54517e6..97cc861 100644
--- a/libs/etcdlib/src/etcd.c
+++ b/libs/etcdlib/src/etcd.c
@@ -116,6 +116,14 @@ void etcdlib_destroy(etcdlib_t *etcdlib) {
free(etcdlib);
}
+const char* etcdlib_host(etcdlib_t *etcdlib) {
+ return etcdlib->host;
+}
+
+int etcdlib_port(etcdlib_t *etcdlib) {
+ return etcdlib->port;
+}
+
int etcd_get(const char* key, char** value, int* modifiedIndex) {
return etcdlib_get(&g_etcdlib, key, value, modifiedIndex);
}
@@ -660,7 +668,13 @@ static int performRequest(char* url, request_t request, void* reqData, void* rep
}
res = curl_easy_perform(curl);
- curl_easy_cleanup(curl);
- return res;
+
+ if (res != CURLE_OK && res != CURLE_OPERATION_TIMEDOUT) {
+ const char* m = request == GET ? "GET" : request == PUT ? "PUT" : request == DELETE ? "DELETE" : "?";
+ fprintf(stderr, "[etclib] Curl error for %s @ %s: %s\n", url, m, curl_easy_strerror(res));
+ }
+
+ curl_easy_cleanup(curl);
+ return res;
}