You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by mi...@apache.org on 2013/05/20 00:08:08 UTC
svn commit: r1484357 - in /zookeeper/trunk: ./ src/c/include/ src/c/src/
src/c/tests/
Author: michim
Date: Sun May 19 22:08:08 2013
New Revision: 1484357
URL: http://svn.apache.org/r1484357
Log:
ZOOKEEPER-1400. Allow logging via callback instead of raw FILE pointer (Marshall McMullen via michim)
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/c/include/zookeeper.h
zookeeper/trunk/src/c/include/zookeeper_log.h
zookeeper/trunk/src/c/src/load_gen.c
zookeeper/trunk/src/c/src/mt_adaptor.c
zookeeper/trunk/src/c/src/zk_adaptor.h
zookeeper/trunk/src/c/src/zk_log.c
zookeeper/trunk/src/c/src/zookeeper.c
zookeeper/trunk/src/c/tests/PthreadMocks.h
zookeeper/trunk/src/c/tests/TestClient.cc
zookeeper/trunk/src/c/tests/TestOperations.cc
zookeeper/trunk/src/c/tests/Util.h
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1484357&r1=1484356&r2=1484357&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Sun May 19 22:08:08 2013
@@ -12,7 +12,10 @@ NEW FEATURES:
ZOOKEEPER-1572. Add an async (Java) interface for multi request (Sijie Guo via camille)
ZOOKEEPER-107. Allow dynamic changes to server cluster membership (Alex Shraer via breed)
-
+
+ ZOOKEEPER-1400. Allow logging via callback instead of raw FILE pointer
+ (Marshall McMullen via michim)
+
BUGFIXES:
ZOOKEEPER-786. Exception in ZooKeeper.toString
Modified: zookeeper/trunk/src/c/include/zookeeper.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/include/zookeeper.h?rev=1484357&r1=1484356&r2=1484357&view=diff
==============================================================================
--- zookeeper/trunk/src/c/include/zookeeper.h (original)
+++ zookeeper/trunk/src/c/include/zookeeper.h Sun May 19 22:08:08 2013
@@ -427,6 +427,14 @@ typedef void (*watcher_fn)(zhandle_t *zh
int state, const char *path,void *watcherCtx);
/**
+ * \brief typedef for setting the log callback. It's a function pointer which
+ * returns void and accepts a const char* as its only argument.
+ *
+ * \param message message to be passed to the callback function.
+ */
+typedef void (*log_callback_fn)(const char *message);
+
+/**
* \brief create a handle to used communicate with zookeeper.
*
* This method creates a new handle and a zookeeper session that corresponds
@@ -458,6 +466,45 @@ ZOOAPI zhandle_t *zookeeper_init(const c
int recv_timeout, const clientid_t *clientid, void *context, int flags);
/**
+ * \brief create a handle to communicate with zookeeper.
+ *
+ * This function is identical to \ref zookeeper_init except it allows one
+ * to specify an additional callback to be used for all logging for that
+ * specific connection. For more details on the logging callback see
+ * \ref zoo_get_log_callback and \ref zoo_set_log_callback.
+ *
+ * This method creates a new handle and a zookeeper session that corresponds
+ * to that handle. Session establishment is asynchronous, meaning that the
+ * session should not be considered established until (and unless) an
+ * event of state ZOO_CONNECTED_STATE is received.
+ * \param host comma separated host:port pairs, each corresponding to a zk
+ * server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+ * \param fn the global watcher callback function. When notifications are
+ * triggered this function will be invoked.
+ * \param clientid the id of a previously established session that this
+ * client will be reconnecting to. Pass 0 if not reconnecting to a previous
+ * session. Clients can access the session id of an established, valid,
+ * connection by calling \ref zoo_client_id. If the session corresponding to
+ * the specified clientid has expired, or if the clientid is invalid for
+ * any reason, the returned zhandle_t will be invalid -- the zhandle_t
+ * state will indicate the reason for failure (typically
+ * ZOO_EXPIRED_SESSION_STATE).
+ * \param context the handback object that will be associated with this instance
+ * of zhandle_t. Application can access it (for example, in the watcher
+ * callback) using \ref zoo_get_context. The object is not used by zookeeper
+ * internally and can be null.
+ * \param flags reserved for future use. Should be set to zero.
+ * \param log_callback All log messages will be passed to this callback function.
+ * For more details see \ref zoo_get_log_callback and \ref zoo_set_log_callback.
+ * \return a pointer to the opaque zhandle structure. If it fails to create
+ * a new zhandle the function returns NULL and the errno variable
+ * indicates the reason.
+ */
+ZOOAPI zhandle_t *zookeeper_init2(const char *host, watcher_fn fn,
+ int recv_timeout, const clientid_t *clientid, void *context, int flags,
+ log_callback_fn log_callback);
+
+/**
* \brief update the list of servers this client will connect to.
*
* This method allows a client to update the connection string by providing
@@ -1408,6 +1455,32 @@ ZOOAPI void zoo_set_debug_level(ZooLogLe
ZOOAPI void zoo_set_log_stream(FILE* logStream);
/**
+ * \brief gets the callback to be used by this connection for logging.
+ *
+ * This is a per-connection logging mechanism that will take priority over
+ * the library-wide default log stream. That is, zookeeper library will first
+ * try to use a per-connection callback if available and if not, will fallback
+ * to using the logging stream. Passing in NULL resets the callback and will
+ * cause it to then fallback to using the logging stream as described in \ref
+ * zoo_set_log_stream.
+ */
+ZOOAPI log_callback_fn zoo_get_log_callback(const zhandle_t *zh);
+
+/**
+ * \brief sets the callback to be used by the library for logging
+ *
+ * Setting this callback has the effect of overriding the default log stream.
+ * Zookeeper will first try to use a per-connection callback if available
+ * and if not, will fallback to using the logging stream. Passing in NULL
+ * resets the callback and will cause it to then fallback to using the logging
+ * stream as described in \ref zoo_set_log_stream.
+ *
+ * Note: The provided callback will be invoked by multiple threads and therefore
+ * it needs to be thread-safe.
+ */
+ZOOAPI void zoo_set_log_callback(zhandle_t *zh, log_callback_fn callback);
+
+/**
* \brief enable/disable quorum endpoint order randomization
*
* Note: typically this method should NOT be used outside of testing.
Modified: zookeeper/trunk/src/c/include/zookeeper_log.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/include/zookeeper_log.h?rev=1484357&r1=1484356&r2=1484357&view=diff
==============================================================================
--- zookeeper/trunk/src/c/include/zookeeper_log.h (original)
+++ zookeeper/trunk/src/c/include/zookeeper_log.h Sun May 19 22:08:08 2013
@@ -26,23 +26,22 @@ extern "C" {
#endif
extern ZOOAPI ZooLogLevel logLevel;
-#define LOGSTREAM getLogStream()
+#define LOGCALLBACK(_zh) zoo_get_log_callback(_zh)
+#define LOGSTREAM NULL
-#define LOG_ERROR(x) if(logLevel>=ZOO_LOG_LEVEL_ERROR) \
- log_message(ZOO_LOG_LEVEL_ERROR,__LINE__,__func__,format_log_message x)
-#define LOG_WARN(x) if(logLevel>=ZOO_LOG_LEVEL_WARN) \
- log_message(ZOO_LOG_LEVEL_WARN,__LINE__,__func__,format_log_message x)
-#define LOG_INFO(x) if(logLevel>=ZOO_LOG_LEVEL_INFO) \
- log_message(ZOO_LOG_LEVEL_INFO,__LINE__,__func__,format_log_message x)
-#define LOG_DEBUG(x) if(logLevel==ZOO_LOG_LEVEL_DEBUG) \
- log_message(ZOO_LOG_LEVEL_DEBUG,__LINE__,__func__,format_log_message x)
+#define LOG_ERROR(_cb, ...) if(logLevel>=ZOO_LOG_LEVEL_ERROR) \
+ log_message(_cb, ZOO_LOG_LEVEL_ERROR, __LINE__, __func__, __VA_ARGS__)
+#define LOG_WARN(_cb, ...) if(logLevel>=ZOO_LOG_LEVEL_WARN) \
+ log_message(_cb, ZOO_LOG_LEVEL_WARN, __LINE__, __func__, __VA_ARGS__)
+#define LOG_INFO(_cb, ...) if(logLevel>=ZOO_LOG_LEVEL_INFO) \
+ log_message(_cb, ZOO_LOG_LEVEL_INFO, __LINE__, __func__, __VA_ARGS__)
+#define LOG_DEBUG(_cb, ...) if(logLevel==ZOO_LOG_LEVEL_DEBUG) \
+ log_message(_cb, ZOO_LOG_LEVEL_DEBUG, __LINE__, __func__, __VA_ARGS__)
-ZOOAPI void log_message(ZooLogLevel curLevel, int line,const char* funcName,
- const char* message);
+ZOOAPI void log_message(log_callback_fn callback, ZooLogLevel curLevel,
+ int line, const char* funcName, const char* format, ...);
-ZOOAPI const char* format_log_message(const char* format,...);
-
-FILE* getLogStream();
+FILE* zoo_get_log_stream();
#ifdef __cplusplus
}
Modified: zookeeper/trunk/src/c/src/load_gen.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/load_gen.c?rev=1484357&r1=1484356&r2=1484357&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/load_gen.c (original)
+++ zookeeper/trunk/src/c/src/load_gen.c Sun May 19 22:08:08 2013
@@ -88,7 +88,7 @@ void listener(zhandle_t *zzh, int type,
void create_completion(int rc, const char *name, const void *data) {
incCounter(-1);
if(rc!=ZOK){
- LOG_ERROR(("Failed to create a node rc=%d",rc));
+ LOG_ERROR(LOGSTREAM, "Failed to create a node rc=%d",rc);
}
}
@@ -102,7 +102,7 @@ int doCreateNodes(const char* root, int
rc=zoo_acreate(zh, nodeName, "first", 5, &ZOO_OPEN_ACL_UNSAFE, 0,
create_completion, 0);
if(i%1000==0){
- LOG_INFO(("Created %s",nodeName));
+ LOG_INFO(LOGSTREAM, "Created %s", nodeName);
}
if(rc!=ZOK) return rc;
}
@@ -116,7 +116,7 @@ int createRoot(const char* root){
void write_completion(int rc, const struct Stat *stat, const void *data) {
incCounter(-1);
if(rc!=ZOK){
- LOG_ERROR(("Failed to write a node rc=%d",rc));
+ LOG_ERROR(LOGSTREAM, "Failed to write a node rc=%d",rc);
}
}
@@ -137,13 +137,13 @@ void read_completion(int rc, const char
const struct Stat *stat, const void *data) {
incCounter(-1);
if(rc!=ZOK){
- LOG_ERROR(("Failed to read a node rc=%d",rc));
+ LOG_ERROR(LOGSTREAM, "Failed to read a node rc=%d",rc);
return;
}
if(memcmp(value,"second",6)!=0){
char buf[value_len+1];
memcpy(buf,value,value_len);buf[value_len]=0;
- LOG_ERROR(("Invalid read, expected [second], received [%s]\n",buf));
+ LOG_ERROR(LOGSTREAM, "Invalid read, expected [second], received [%s]\n",buf);
exit(1);
}
}
@@ -198,7 +198,7 @@ int recursiveDelete(const char* root){
int rc=zoo_get_children(zh,root,0,&children);
if(rc!=ZNONODE){
if(rc!=ZOK){
- LOG_ERROR(("Failed to get children of %s, rc=%d",root,rc));
+ LOG_ERROR(LOGSTREAM, "Failed to get children of %s, rc=%d",root,rc);
return rc;
}
for(i=0;i<children.count; i++){
@@ -214,10 +214,10 @@ int recursiveDelete(const char* root){
free_String_vector(&children);
}
if(deletedCounter%1000==0)
- LOG_INFO(("Deleting %s",root));
+ LOG_INFO(LOGSTREAM, "Deleting %s",root);
rc=zoo_delete(zh,root,-1);
if(rc!=ZOK){
- LOG_ERROR(("Failed to delete znode %s, rc=%d",root,rc));
+ LOG_ERROR(LOGSTREAM, "Failed to delete znode %s, rc=%d",root,rc);
}else
deletedCounter++;
return rc;
@@ -245,15 +245,15 @@ int main(int argc, char **argv) {
if (!zh)
return errno;
- LOG_INFO(("Checking server connection..."));
+ LOG_INFO(LOGSTREAM, "Checking server connection...");
ensureConnected();
if(cleaning==1){
int rc = 0;
deletedCounter=0;
rc=recursiveDelete(argv[2]);
if(rc==ZOK){
- LOG_INFO(("Succesfully deleted a subtree starting at %s (%d nodes)",
- argv[2],deletedCounter));
+ LOG_INFO(LOGSTREAM, "Succesfully deleted a subtree starting at %s (%d nodes)",
+ argv[2],deletedCounter);
exit(0);
}
exit(1);
@@ -262,18 +262,18 @@ int main(int argc, char **argv) {
createRoot(argv[2]);
while(1) {
ensureConnected();
- LOG_INFO(("Creating children for path %s",argv[2]));
+ LOG_INFO(LOGSTREAM, "Creating children for path %s",argv[2]);
doCreateNodes(argv[2],nodeCount);
waitCounter();
- LOG_INFO(("Starting the write cycle for path %s",argv[2]));
+ LOG_INFO(LOGSTREAM, "Starting the write cycle for path %s",argv[2]);
doWrites(argv[2],nodeCount);
waitCounter();
- LOG_INFO(("Starting the read cycle for path %s",argv[2]));
+ LOG_INFO(LOGSTREAM, "Starting the read cycle for path %s",argv[2]);
doReads(argv[2],nodeCount);
waitCounter();
- LOG_INFO(("Starting the delete cycle for path %s",argv[2]));
+ LOG_INFO(LOGSTREAM, "Starting the delete cycle for path %s",argv[2]);
doDeletes(argv[2],nodeCount);
waitCounter();
}
Modified: zookeeper/trunk/src/c/src/mt_adaptor.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/mt_adaptor.c?rev=1484357&r1=1484356&r2=1484357&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/mt_adaptor.c (original)
+++ zookeeper/trunk/src/c/src/mt_adaptor.c Sun May 19 22:08:08 2013
@@ -116,7 +116,7 @@ unsigned __stdcall do_completion( void *
int handle_error(SOCKET sock, char* message)
{
- LOG_ERROR(("%s. %d",message, WSAGetLastError()));
+ LOG_ERROR(LOGCALLBACK(zh), "%s. %d",message, WSAGetLastError());
closesocket (sock);
return -1;
}
@@ -131,7 +131,7 @@ int create_socket_pair(SOCKET fds[2])
SOCKET lst=socket(AF_INET, SOCK_STREAM,IPPROTO_TCP);
if (lst == INVALID_SOCKET ){
- LOG_ERROR(("Error creating socket. %d",WSAGetLastError()));
+ LOG_ERROR(LOGCALLBACK(zh), "Error creating socket. %d",WSAGetLastError());
return -1;
}
memset(&inaddr, 0, sizeof(inaddr));
@@ -218,7 +218,7 @@ void start_threads(zhandle_t* zh)
// use api_prolog() to make sure zhandle doesn't get destroyed
// while initialization is in progress
api_prolog(zh);
- LOG_DEBUG(("starting threads..."));
+ LOG_DEBUG(LOGCALLBACK(zh), "starting threads...");
rc=pthread_create(&adaptor->io, 0, do_io, zh);
assert("pthread_create() failed for the IO thread"&&!rc);
rc=pthread_create(&adaptor->completion, 0, do_completion, zh);
@@ -232,17 +232,17 @@ int adaptor_init(zhandle_t *zh)
pthread_mutexattr_t recursive_mx_attr;
struct adaptor_threads *adaptor_threads = calloc(1, sizeof(*adaptor_threads));
if (!adaptor_threads) {
- LOG_ERROR(("Out of memory"));
+ LOG_ERROR(LOGCALLBACK(zh), "Out of memory");
return -1;
}
/* We use a pipe for interrupting select() in unix/sol and socketpair in windows. */
#ifdef WIN32
if (create_socket_pair(adaptor_threads->self_pipe) == -1){
- LOG_ERROR(("Can't make a socket."));
+ LOG_ERROR(LOGCALLBACK(zh), "Can't make a socket.");
#else
if(pipe(adaptor_threads->self_pipe)==-1) {
- LOG_ERROR(("Can't make a pipe %d",errno));
+ LOG_ERROR(LOGCALLBACK(zh), "Can't make a pipe %d",errno);
#endif
free(adaptor_threads);
return -1;
@@ -365,7 +365,7 @@ void *do_io(void *v)
api_prolog(zh);
notify_thread_ready(zh);
- LOG_DEBUG(("started IO thread"));
+ LOG_DEBUG(LOGCALLBACK(zh), "started IO thread");
fds[0].fd=adaptor_threads->self_pipe[0];
fds[0].events=POLLIN;
while(!zh->close_requested) {
@@ -400,7 +400,7 @@ void *do_io(void *v)
struct adaptor_threads *adaptor_threads = zh->adaptor_priv;
api_prolog(zh);
notify_thread_ready(zh);
- LOG_DEBUG(("started IO thread"));
+ LOG_DEBUG(LOGCALLBACK(zh), "started IO thread");
FD_ZERO(&rfds); FD_ZERO(&wfds); FD_ZERO(&efds);
while(!zh->close_requested) {
struct timeval tv;
@@ -444,7 +444,7 @@ void *do_io(void *v)
break;
}
api_epilog(zh, 0);
- LOG_DEBUG(("IO thread terminated"));
+ LOG_DEBUG(LOGCALLBACK(zh), "IO thread terminated");
return 0;
}
@@ -457,7 +457,7 @@ void *do_completion(void *v)
zhandle_t *zh = v;
api_prolog(zh);
notify_thread_ready(zh);
- LOG_DEBUG(("started completion thread"));
+ LOG_DEBUG(LOGCALLBACK(zh), "started completion thread");
while(!zh->close_requested) {
pthread_mutex_lock(&zh->completions_to_process.lock);
while(!zh->completions_to_process.head && !zh->close_requested) {
@@ -467,7 +467,7 @@ void *do_completion(void *v)
process_completions(zh);
}
api_epilog(zh, 0);
- LOG_DEBUG(("completion thread terminated"));
+ LOG_DEBUG(LOGCALLBACK(zh), "completion thread terminated");
return 0;
}
Modified: zookeeper/trunk/src/c/src/zk_adaptor.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/zk_adaptor.h?rev=1484357&r1=1484356&r2=1484357&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/zk_adaptor.h (original)
+++ zookeeper/trunk/src/c/src/zk_adaptor.h Sun May 19 22:08:08 2013
@@ -223,6 +223,7 @@ struct _zhandle {
clientid_t client_id; // client-id
long long last_zxid; // last zookeeper ID
auth_list_head_t auth_h; // authentication data list
+ log_callback_fn log_callback; // Callback for logging (falls back to logging to stderr)
// Primer storage
struct _buffer_list primer_buffer; // The buffer used for the handshake at the start of a connection
Modified: zookeeper/trunk/src/c/src/zk_log.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/zk_log.c?rev=1484357&r1=1484356&r2=1484357&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/zk_log.c (original)
+++ zookeeper/trunk/src/c/src/zk_log.c Sun May 19 22:08:08 2013
@@ -86,7 +86,7 @@ char* get_format_log_buffer(){
ZooLogLevel logLevel=ZOO_LOG_LEVEL_INFO;
static FILE* logStream=0;
-FILE* getLogStream(){
+FILE* zoo_get_log_stream(){
if(logStream==0)
logStream=stderr;
return logStream;
@@ -122,44 +122,64 @@ static const char* time_now(char* now_st
return now_str;
}
-void log_message(ZooLogLevel curLevel,int line,const char* funcName,
- const char* message)
+void log_message(log_callback_fn callback, ZooLogLevel curLevel,
+ int line, const char* funcName, const char* format, ...)
{
static const char* dbgLevelStr[]={"ZOO_INVALID","ZOO_ERROR","ZOO_WARN",
"ZOO_INFO","ZOO_DEBUG"};
+
+ char* buf = get_format_log_buffer();
+ if(!buf)
+ {
+ fprintf(stderr, "log_message: Unable to allocate memory buffer");
+ return;
+ }
+
static pid_t pid=0;
+
+ if(pid==0)
+ {
+ pid=getpid();
+ }
+
#ifdef WIN32
char timebuf [TIME_NOW_BUF_SIZE];
+ const char* time = time_now(timebuf);
+#else
+ const char* time = time_now(get_time_buffer());
#endif
- if(pid==0)pid=getpid();
+
#ifndef THREADED
- fprintf(LOGSTREAM, "%s:%d:%s@%s@%d: %s\n", time_now(get_time_buffer()),pid,
- dbgLevelStr[curLevel],funcName,line,message);
-#else
-#ifdef WIN32
- fprintf(LOGSTREAM, "%s:%d(0x%lx):%s@%s@%d: %s\n", time_now(timebuf),pid,
- (unsigned long int)(pthread_self().thread_id),
- dbgLevelStr[curLevel],funcName,line,message);
+
+ int ofs = snprintf(buf, FORMAT_LOG_BUF_SIZE,
+ "%s:%d:%s@%s@%d: ", time, pid,
+ dbgLevelStr[curLevel], funcName, line);
#else
- fprintf(LOGSTREAM, "%s:%d(0x%lx):%s@%s@%d: %s\n", time_now(get_time_buffer()),pid,
- (unsigned long int)pthread_self(),
- dbgLevelStr[curLevel],funcName,line,message);
-#endif
+
+ #ifdef WIN32
+ unsigned long int tid = (unsigned long int)(pthread_self().thread_id);
+ #else
+ unsigned long int tid = (unsigned long int)(pthread_self());
+ #endif
+
+ int ofs = snprintf(buf, FORMAT_LOG_BUF_SIZE-1,
+ "%s:%d(0x%lx):%s@%s@%d: ", time, pid, tid,
+ dbgLevelStr[curLevel], funcName, line);
#endif
- fflush(LOGSTREAM);
-}
-const char* format_log_message(const char* format,...)
-{
+ // Now grab the actual message out of the variadic arg list
va_list va;
- char* buf=get_format_log_buffer();
- if(!buf)
- return "format_log_message: Unable to allocate memory buffer";
-
- va_start(va,format);
- vsnprintf(buf, FORMAT_LOG_BUF_SIZE-1,format,va);
- va_end(va);
- return buf;
+ va_start(va, format);
+ vsnprintf(buf+ofs, FORMAT_LOG_BUF_SIZE-1-ofs, format, va);
+ va_end(va);
+
+ if (callback)
+ {
+ callback(buf);
+ } else {
+ fprintf(zoo_get_log_stream(), "%s\n", buf);
+ fflush(zoo_get_log_stream());
+ }
}
void zoo_set_debug_level(ZooLogLevel level)
Modified: zookeeper/trunk/src/c/src/zookeeper.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/zookeeper.c?rev=1484357&r1=1484356&r2=1484357&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/zookeeper.c (original)
+++ zookeeper/trunk/src/c/src/zookeeper.c Sun May 19 22:08:08 2013
@@ -184,14 +184,14 @@ static int queue_session_event(zhandle_t
static const char* format_endpoint_info(const struct sockaddr_storage* ep);
/* deserialize forward declarations */
-static void deserialize_response(int type, int xid, int failed, int rc, completion_list_t *cptr, struct iarchive *ia);
-static int deserialize_multi(int xid, completion_list_t *cptr, struct iarchive *ia);
+static void deserialize_response(zhandle_t *zh, int type, int xid, int failed, int rc, completion_list_t *cptr, struct iarchive *ia);
+static int deserialize_multi(zhandle_t *zh, int xid, completion_list_t *cptr, struct iarchive *ia);
/* completion routine forward declarations */
static int add_completion(zhandle_t *zh, int xid, int completion_type,
const void *dc, const void *data, int add_to_front,
watcher_registration_t* wo, completion_head_t *clist);
-static completion_list_t* create_completion_entry(int xid, int completion_type,
+static completion_list_t* create_completion_entry(zhandle_t *zh, int xid, int completion_type,
const void *dc, const void *data, watcher_registration_t* wo,
completion_head_t *clist);
static void destroy_completion_entry(completion_list_t* c);
@@ -484,7 +484,7 @@ static int count_hosts(char *hosts)
* The contents of the provided address vector will be initialized to an
* empty state.
*/
-int resolve_hosts(const char *hosts_in, addrvec_t *avec)
+static int resolve_hosts(const zhandle_t *zh, const char *hosts_in, addrvec_t *avec)
{
int rc = ZOK;
char *host = NULL;
@@ -492,7 +492,7 @@ int resolve_hosts(const char *hosts_in,
int num_hosts = 0;
char *strtok_last = NULL;
- if (hosts_in == NULL || avec == NULL) {
+ if (zh == NULL || hosts_in == NULL || avec == NULL) {
return ZBADARGUMENTS;
}
@@ -501,7 +501,7 @@ int resolve_hosts(const char *hosts_in,
hosts = strdup(hosts_in);
if (hosts == NULL) {
- LOG_ERROR(("out of memory"));
+ LOG_ERROR(LOGCALLBACK(zh), "out of memory");
errno=ENOMEM;
rc=ZSYSTEMERROR;
goto fail;
@@ -516,7 +516,7 @@ int resolve_hosts(const char *hosts_in,
// Allocate list inside avec
rc = addrvec_alloc_capacity(avec, num_hosts);
if (rc != 0) {
- LOG_ERROR(("out of memory"));
+ LOG_ERROR(LOGCALLBACK(zh), "out of memory");
errno=ENOMEM;
rc=ZSYSTEMERROR;
goto fail;
@@ -528,7 +528,7 @@ int resolve_hosts(const char *hosts_in,
char *end_port_spec;
int port;
if (!port_spec) {
- LOG_ERROR(("no port in %s", host));
+ LOG_ERROR(LOGCALLBACK(zh), "no port in %s", host);
errno=EINVAL;
rc=ZBADARGUMENTS;
goto fail;
@@ -537,7 +537,7 @@ int resolve_hosts(const char *hosts_in,
port_spec++;
port = strtol(port_spec, &end_port_spec, 0);
if (!*port_spec || *end_port_spec || port == 0) {
- LOG_ERROR(("invalid port in %s", host));
+ LOG_ERROR(LOGCALLBACK(zh), "invalid port in %s", host);
errno=EINVAL;
rc=ZBADARGUMENTS;
goto fail;
@@ -552,7 +552,7 @@ int resolve_hosts(const char *hosts_in,
he = gethostbyname(host);
if (!he) {
- LOG_ERROR(("could not resolve %s", host));
+ LOG_ERROR(LOGCALLBACK(zh), "could not resolve %s", host);
errno=ENOENT;
rc=ZBADARGUMENTS;
goto fail;
@@ -563,7 +563,7 @@ int resolve_hosts(const char *hosts_in,
if (addrs->count == addrs->capacity) {
rc = addrvec_grow_default(addrs);
if (rc != 0) {
- LOG_ERROR(("out of memory"));
+ LOG_ERROR(LOGCALLBACK(zh), "out of memory");
errno=ENOMEM;
rc=ZSYSTEMERROR;
goto fail;
@@ -591,8 +591,8 @@ int resolve_hosts(const char *hosts_in,
}
#endif
else {
- LOG_WARN(("skipping unknown address family %x for %s",
- addr->ss_family, hosts_in));
+ LOG_WARN(LOGCALLBACK(zh), "skipping unknown address family %x for %s",
+ addr->ss_family, hosts_in);
}
}
host = strtok_r(0, ",", &strtok_last);
@@ -635,11 +635,11 @@ int resolve_hosts(const char *hosts_in,
if (rc != 0) {
errno = getaddrinfo_errno(rc);
#ifdef WIN32
- LOG_ERROR(("Win32 message: %s\n", gai_strerror(rc)));
+ LOG_ERROR(LOGCALLBACK(zh), "Win32 message: %s\n", gai_strerror(rc));
#elif __linux__ && __GNUC__
- LOG_ERROR(("getaddrinfo: %s\n", gai_strerror(rc)));
+ LOG_ERROR(LOGCALLBACK(zh), "getaddrinfo: %s\n", gai_strerror(rc));
#else
- LOG_ERROR(("getaddrinfo: %s\n", strerror(errno)));
+ LOG_ERROR(LOGCALLBACK(zh), "getaddrinfo: %s\n", strerror(errno));
#endif
rc=ZSYSTEMERROR;
goto fail;
@@ -651,7 +651,7 @@ int resolve_hosts(const char *hosts_in,
if (avec->count == avec->capacity) {
rc = addrvec_grow_default(avec);
if (rc != 0) {
- LOG_ERROR(("out of memory"));
+ LOG_ERROR(LOGCALLBACK(zh), "out of memory");
errno=ENOMEM;
rc=ZSYSTEMERROR;
goto fail;
@@ -667,8 +667,8 @@ int resolve_hosts(const char *hosts_in,
addrvec_append_addrinfo(avec, res);
break;
default:
- LOG_WARN(("skipping unknown address family %x for %s",
- res->ai_family, hosts_in));
+ LOG_WARN(LOGCALLBACK(zh), "skipping unknown address family %x for %s",
+ res->ai_family, hosts_in);
break;
}
}
@@ -753,7 +753,7 @@ int update_addrs(zhandle_t *zh)
goto fail;
}
- rc = resolve_hosts(hosts, &resolved);
+ rc = resolve_hosts(zh, hosts, &resolved);
if (rc != ZOK)
{
goto fail;
@@ -892,7 +892,7 @@ struct sockaddr* zookeeper_get_connected
return addr;
}
-static void log_env() {
+static void log_env(zhandle_t *zh) {
char buf[2048];
#ifdef HAVE_SYS_UTSNAME_H
struct utsname utsname;
@@ -904,72 +904,82 @@ static void log_env() {
uid_t uid = 0;
#endif
- LOG_INFO(("Client environment:zookeeper.version=%s", PACKAGE_STRING));
+ LOG_INFO(LOGCALLBACK(zh), "Client environment:zookeeper.version=%s", PACKAGE_STRING);
#ifdef HAVE_GETHOSTNAME
gethostname(buf, sizeof(buf));
- LOG_INFO(("Client environment:host.name=%s", buf));
+ LOG_INFO(LOGCALLBACK(zh), "Client environment:host.name=%s", buf);
#else
- LOG_INFO(("Client environment:host.name=<not implemented>"));
+ LOG_INFO(LOGCALLBACK(zh), "Client environment:host.name=<not implemented>");
#endif
#ifdef HAVE_SYS_UTSNAME_H
uname(&utsname);
- LOG_INFO(("Client environment:os.name=%s", utsname.sysname));
- LOG_INFO(("Client environment:os.arch=%s", utsname.release));
- LOG_INFO(("Client environment:os.version=%s", utsname.version));
+ LOG_INFO(LOGCALLBACK(zh), "Client environment:os.name=%s", utsname.sysname);
+ LOG_INFO(LOGCALLBACK(zh), "Client environment:os.arch=%s", utsname.release);
+ LOG_INFO(LOGCALLBACK(zh), "Client environment:os.version=%s", utsname.version);
#else
- LOG_INFO(("Client environment:os.name=<not implemented>"));
- LOG_INFO(("Client environment:os.arch=<not implemented>"));
- LOG_INFO(("Client environment:os.version=<not implemented>"));
+ LOG_INFO(LOGCALLBACK(zh), "Client environment:os.name=<not implemented>");
+ LOG_INFO(LOGCALLBACK(zh), "Client environment:os.arch=<not implemented>");
+ LOG_INFO(LOGCALLBACK(zh), "Client environment:os.version=<not implemented>");
#endif
#ifdef HAVE_GETLOGIN
- LOG_INFO(("Client environment:user.name=%s", getlogin()));
+ LOG_INFO(LOGCALLBACK(zh), "Client environment:user.name=%s", getlogin());
#else
- LOG_INFO(("Client environment:user.name=<not implemented>"));
+ LOG_INFO(LOGCALLBACK(zh), "Client environment:user.name=<not implemented>");
#endif
#if defined(HAVE_GETUID) && defined(HAVE_GETPWUID_R)
uid = getuid();
if (!getpwuid_r(uid, &pw, buf, sizeof(buf), &pwp) && pwp) {
- LOG_INFO(("Client environment:user.home=%s", pw.pw_dir));
+ LOG_INFO(LOGCALLBACK(zh), "Client environment:user.home=%s", pw.pw_dir);
} else {
- LOG_INFO(("Client environment:user.home=<NA>"));
+ LOG_INFO(LOGCALLBACK(zh), "Client environment:user.home=<NA>");
}
#else
- LOG_INFO(("Client environment:user.home=<not implemented>"));
+ LOG_INFO(LOGCALLBACK(zh), "Client environment:user.home=<not implemented>");
#endif
#ifdef HAVE_GETCWD
if (!getcwd(buf, sizeof(buf))) {
- LOG_INFO(("Client environment:user.dir=<toolong>"));
+ LOG_INFO(LOGCALLBACK(zh), "Client environment:user.dir=<toolong>");
} else {
- LOG_INFO(("Client environment:user.dir=%s", buf));
+ LOG_INFO(LOGCALLBACK(zh), "Client environment:user.dir=%s", buf);
}
#else
- LOG_INFO(("Client environment:user.dir=<not implemented>"));
+ LOG_INFO(LOGCALLBACK(zh), "Client environment:user.dir=<not implemented>");
#endif
}
/**
* Create a zookeeper handle associated with the given host and port.
*/
-zhandle_t *zookeeper_init(const char *host, watcher_fn watcher,
- int recv_timeout, const clientid_t *clientid, void *context, int flags)
+static zhandle_t *zookeeper_init_internal(const char *host, watcher_fn watcher,
+ int recv_timeout, const clientid_t *clientid, void *context, int flags,
+ log_callback_fn log_callback)
{
int errnosave = 0;
zhandle_t *zh = NULL;
char *index_chroot = NULL;
- log_env();
+ // Create our handle
+ zh = calloc(1, sizeof(*zh));
+ if (!zh) {
+ return 0;
+ }
+
+ // Set log callback before calling into log_env
+ zh->log_callback = log_callback;
+ log_env(zh);
+
#ifdef WIN32
- if (Win32WSAStartup()){
- LOG_ERROR(("Error initializing ws2_32.dll"));
- return 0;
- }
+ if (Win32WSAStartup()){
+ LOG_ERROR(LOGCALLBACK(zh), "Error initializing ws2_32.dll");
+ return 0;
+ }
#endif
- LOG_INFO(("Initiating client connection, host=%s sessionTimeout=%d watcher=%p"
+ LOG_INFO(LOGCALLBACK(zh), "Initiating client connection, host=%s sessionTimeout=%d watcher=%p"
" sessionId=%#llx sessionPasswd=%s context=%p flags=%d",
host,
recv_timeout,
@@ -978,12 +988,8 @@ zhandle_t *zookeeper_init(const char *ho
((clientid == 0) || (clientid->passwd[0] == 0) ?
"<null>" : "<hidden>"),
context,
- flags));
+ flags);
- zh = calloc(1, sizeof(*zh));
- if (!zh) {
- return 0;
- }
zh->hostname = NULL;
zh->fd = -1;
zh->state = ZOO_NOTCONNECTED_STATE;
@@ -1060,6 +1066,19 @@ abort:
return 0;
}
+zhandle_t *zookeeper_init(const char *host, watcher_fn watcher,
+ int recv_timeout, const clientid_t *clientid, void *context, int flags)
+{
+ return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, NULL);
+}
+
+zhandle_t *zookeeper_init2(const char *host, watcher_fn watcher,
+ int recv_timeout, const clientid_t *clientid, void *context, int flags,
+ log_callback_fn log_callback)
+{
+ return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, log_callback);
+}
+
/**
* Set a new list of zk servers to connect to. Disconnect will occur if
* current connection endpoint is not in the list.
@@ -1068,7 +1087,7 @@ int zoo_set_servers(zhandle_t *zh, const
{
if (hosts == NULL)
{
- LOG_ERROR(("New server list cannot be empty"));
+ LOG_ERROR(LOGCALLBACK(zh), "New server list cannot be empty");
return ZBADARGUMENTS;
}
@@ -1107,12 +1126,12 @@ static int get_next_server_in_reconfig(z
{
int take_new = drand48() <= zh->pNew;
- LOG_DEBUG(("[OLD] count=%d capacity=%d next=%d hasnext=%d",
+ LOG_DEBUG(LOGCALLBACK(zh), "[OLD] count=%d capacity=%d next=%d hasnext=%d",
zh->addrs_old.count, zh->addrs_old.capacity, zh->addrs_old.next,
- addrvec_hasnext(&zh->addrs_old)));
- LOG_DEBUG(("[NEW] count=%d capacity=%d next=%d hasnext=%d",
+ addrvec_hasnext(&zh->addrs_old));
+ LOG_DEBUG(LOGCALLBACK(zh), "[NEW] count=%d capacity=%d next=%d hasnext=%d",
zh->addrs_new.count, zh->addrs_new.capacity, zh->addrs_new.next,
- addrvec_hasnext(&zh->addrs_new)));
+ addrvec_hasnext(&zh->addrs_new));
// Take one of the new servers if we haven't tried them all yet
// and either the probability tells us to connect to one of the new servers
@@ -1121,18 +1140,18 @@ static int get_next_server_in_reconfig(z
&& (take_new || !addrvec_hasnext(&zh->addrs_old)))
{
addrvec_next(&zh->addrs_new, &zh->addr_cur);
- LOG_DEBUG(("Using next from NEW=%s", format_endpoint_info(&zh->addr_cur)));
+ LOG_DEBUG(LOGCALLBACK(zh), "Using next from NEW=%s", format_endpoint_info(&zh->addr_cur));
return 0;
}
// start taking old servers
if (addrvec_hasnext(&zh->addrs_old)) {
addrvec_next(&zh->addrs_old, &zh->addr_cur);
- LOG_DEBUG(("Using next from OLD=%s", format_endpoint_info(&zh->addr_cur)));
+ LOG_DEBUG(LOGCALLBACK(zh), "Using next from OLD=%s", format_endpoint_info(&zh->addr_cur));
return 0;
}
- LOG_DEBUG(("Failed to find either new or old"));
+ LOG_DEBUG(LOGCALLBACK(zh), "Failed to find either new or old");
memset(&zh->addr_cur, 0, sizeof(zh->addr_cur));
return 1;
}
@@ -1226,8 +1245,8 @@ char* sub_string(zhandle_t *zh, const ch
return (char *) server_path;
//ZOOKEEPER-1027
if (strncmp(server_path, zh->chroot, strlen(zh->chroot)) != 0) {
- LOG_ERROR(("server path %s does not include chroot path %s",
- server_path, zh->chroot));
+ LOG_ERROR(LOGCALLBACK(zh), "server path %s does not include chroot path %s",
+ server_path, zh->chroot);
return (char *) server_path;
}
if (strlen(server_path) == strlen(zh->chroot)) {
@@ -1546,17 +1565,17 @@ static void handle_error(zhandle_t *zh,i
{
close(zh->fd);
if (is_unrecoverable(zh)) {
- LOG_DEBUG(("Calling a watcher for a ZOO_SESSION_EVENT and the state=%s",
- state2String(zh->state)));
+ LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=%s",
+ state2String(zh->state));
PROCESS_SESSION_EVENT(zh, zh->state);
} else if (zh->state == ZOO_CONNECTED_STATE) {
- LOG_DEBUG(("Calling a watcher for a ZOO_SESSION_EVENT and the state=CONNECTING_STATE"));
+ LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=CONNECTING_STATE");
PROCESS_SESSION_EVENT(zh, ZOO_CONNECTING_STATE);
}
cleanup_bufs(zh,1,rc);
zh->fd = -1;
- LOG_DEBUG(("Previous connection=[%s] delay=%d", zoo_get_current_server(zh), zh->delay));
+ LOG_DEBUG(LOGCALLBACK(zh), "Previous connection=[%s] delay=%d", zoo_get_current_server(zh), zh->delay);
// NOTE: If we're at the end of the list of addresses to connect to, then
// we want to delay the next connection attempt to avoid spinning.
@@ -1580,9 +1599,9 @@ static int handle_socket_error_msg(zhand
char buf[1024];
va_start(va,format);
vsnprintf(buf, sizeof(buf)-1,format,va);
- log_message(ZOO_LOG_LEVEL_ERROR,line,__func__,
- format_log_message("Socket [%s] zk retcode=%d, errno=%d(%s): %s",
- zoo_get_current_server(zh),rc,errno,strerror(errno),buf));
+ log_message(LOGCALLBACK(zh), ZOO_LOG_LEVEL_ERROR,line,__func__,
+ "Socket [%s] zk retcode=%d, errno=%d(%s): %s",
+ zoo_get_current_server(zh),rc,errno,strerror(errno),buf);
va_end(va);
}
handle_error(zh,rc);
@@ -1611,11 +1630,11 @@ static void auth_completion_func(int rc,
get_auth_completions(&zh->auth_h, &a_list);
zoo_unlock_auth(zh);
if (rc) {
- LOG_ERROR(("Authentication scheme %s failed. Connection closed.",
- zh->auth_h.auth->scheme));
+ LOG_ERROR(LOGCALLBACK(zh), "Authentication scheme %s failed. Connection closed.",
+ zh->auth_h.auth->scheme);
}
else {
- LOG_INFO(("Authentication scheme %s succeeded", zh->auth_h.auth->scheme));
+ LOG_INFO(LOGCALLBACK(zh), "Authentication scheme %s succeeded", zh->auth_h.auth->scheme);
}
a_tmp = &a_list;
// chain call user's completion function
@@ -1665,7 +1684,7 @@ static int send_auth_info(zhandle_t *zh)
auth = auth->next;
}
zoo_unlock_auth(zh);
- LOG_DEBUG(("Sending all auth info request to %s", zoo_get_current_server(zh)));
+ LOG_DEBUG(LOGCALLBACK(zh), "Sending all auth info request to %s", zoo_get_current_server(zh));
return (rc <0) ? ZMARSHALLINGERROR:ZOK;
}
@@ -1682,7 +1701,7 @@ static int send_last_auth_info(zhandle_t
}
rc = send_info_packet(zh, auth);
zoo_unlock_auth(zh);
- LOG_DEBUG(("Sending auth info request to %s",zoo_get_current_server(zh)));
+ LOG_DEBUG(LOGCALLBACK(zh), "Sending auth info request to %s",zoo_get_current_server(zh));
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
}
@@ -1729,7 +1748,7 @@ static int send_set_watches(zhandle_t *z
free_key_list(req.dataWatches.data, req.dataWatches.count);
free_key_list(req.existWatches.data, req.existWatches.count);
free_key_list(req.childWatches.data, req.childWatches.count);
- LOG_DEBUG(("Sending set watches request to %s",zoo_get_current_server(zh)));
+ LOG_DEBUG(LOGCALLBACK(zh), "Sending set watches request to %s",zoo_get_current_server(zh));
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
}
@@ -1891,7 +1910,7 @@ int zookeeper_interest(zhandle_t *zh, in
int max_exceed = zh->recv_timeout / 10 > 200 ? 200 :
(zh->recv_timeout / 10);
if (time_left > max_exceed)
- LOG_WARN(("Exceeded deadline by %dms", time_left));
+ LOG_WARN(LOGCALLBACK(zh), "Exceeded deadline by %dms", time_left);
}
api_prolog(zh);
@@ -1919,8 +1938,8 @@ int zookeeper_interest(zhandle_t *zh, in
*tv = get_timeval(zh->recv_timeout/60);
zh->delay = 0;
- LOG_WARN(("Delaying connection after exhaustively trying all servers [%s]",
- zh->hostname));
+ LOG_WARN(LOGCALLBACK(zh), "Delaying connection after exhaustively trying all servers [%s]",
+ zh->hostname);
}
// No need to delay -- grab the next server and attempt connection
@@ -1941,7 +1960,7 @@ int zookeeper_interest(zhandle_t *zh, in
}
ssoresult = setsockopt(zh->fd, IPPROTO_TCP, TCP_NODELAY, &enable_tcp_nodelay, sizeof(enable_tcp_nodelay));
if (ssoresult != 0) {
- LOG_WARN(("Unable to set TCP_NODELAY, operation latency may be effected"));
+ LOG_WARN(LOGCALLBACK(zh), "Unable to set TCP_NODELAY, operation latency may be effected");
}
#ifdef WIN32
ioctlsocket(zh->fd, FIONBIO, &nonblocking_flag);
@@ -1953,7 +1972,7 @@ int zookeeper_interest(zhandle_t *zh, in
rc = connect(zh->fd, (struct sockaddr*)&zh->addr_cur, sizeof(struct sockaddr_in6));
} else {
#else
- LOG_DEBUG(("[zk] connect()\n"));
+ LOG_DEBUG(LOGCALLBACK(zh), "[zk] connect()\n");
{
#endif
rc = connect(zh->fd, (struct sockaddr*)&zh->addr_cur, sizeof(struct sockaddr_in));
@@ -1977,7 +1996,7 @@ int zookeeper_interest(zhandle_t *zh, in
if((rc=prime_connection(zh))!=0)
return api_epilog(zh,rc);
- LOG_INFO(("Initiated connection to server [%s]", format_endpoint_info(&zh->addr_cur)));
+ LOG_INFO(LOGCALLBACK(zh), "Initiated connection to server [%s]", format_endpoint_info(&zh->addr_cur));
}
*tv = get_timeval(zh->recv_timeout/3);
}
@@ -2014,11 +2033,11 @@ int zookeeper_interest(zhandle_t *zh, in
if (zh->state==ZOO_CONNECTED_STATE) {
send_to = zh->recv_timeout/3 - idle_send;
if (send_to <= 0 && zh->sent_requests.head==0) {
-// LOG_DEBUG(("Sending PING to %s (exceeded idle by %dms)",
-// zoo_get_current_server(zh),-send_to));
+// LOG_DEBUG(LOGCALLBACK(zh), "Sending PING to %s (exceeded idle by %dms)",
+// zoo_get_current_server(zh),-send_to);
rc = send_ping(zh);
if (rc < 0){
- LOG_ERROR(("failed to send PING request (zk retcode=%d)",rc));
+ LOG_ERROR(LOGCALLBACK(zh), "failed to send PING request (zk retcode=%d)",rc);
return api_epilog(zh,rc);
}
send_to = zh->recv_timeout/3;
@@ -2064,7 +2083,7 @@ static int check_events(zhandle_t *zh, i
if((rc=prime_connection(zh))!=0)
return rc;
- LOG_INFO(("initiated connection to server [%s]", format_endpoint_info(&zh->addr_cur)));
+ LOG_INFO(LOGCALLBACK(zh), "initiated connection to server [%s]", format_endpoint_info(&zh->addr_cur));
return ZOK;
}
if (zh->to_send.head && (events&ZOOKEEPER_WRITE)) {
@@ -2110,15 +2129,15 @@ static int check_events(zhandle_t *zh, i
sizeof(zh->client_id.passwd));
zh->state = ZOO_CONNECTED_STATE;
zh->reconfig = 0;
- LOG_INFO(("session establishment complete on server [%s], sessionId=%#llx, negotiated timeout=%d",
+ LOG_INFO(LOGCALLBACK(zh), "session establishment complete on server [%s], sessionId=%#llx, negotiated timeout=%d",
format_endpoint_info(&zh->addr_cur),
- newid, zh->recv_timeout));
+ newid, zh->recv_timeout);
/* we want the auth to be sent for, but since both call push to front
we need to call send_watch_set first */
send_set_watches(zh);
/* send the authentication packet now */
send_auth_info(zh);
- LOG_DEBUG(("Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE"));
+ LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE");
zh->input_buffer = 0; // just in case the watcher calls zookeeper_process() again
PROCESS_SESSION_EVENT(zh, ZOO_CONNECTED_STATE);
}
@@ -2176,7 +2195,7 @@ static int queue_session_event(zhandle_t
completion_list_t *cptr;
if ((oa=create_buffer_oarchive())==NULL) {
- LOG_ERROR(("out of memory"));
+ LOG_ERROR(LOGCALLBACK(zh), "out of memory");
goto error;
}
rc = serialize_ReplyHeader(oa, "hdr", &hdr);
@@ -2185,7 +2204,7 @@ static int queue_session_event(zhandle_t
close_buffer_oarchive(&oa, 1);
goto error;
}
- cptr = create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0,0);
+ cptr = create_completion_entry(zh, WATCHER_EVENT_XID,-1,0,0,0,0);
cptr->buffer = allocate_buffer(get_buffer(oa), get_buffer_len(oa));
cptr->buffer->curr_offset = get_buffer_len(oa);
if (!cptr->buffer) {
@@ -2223,14 +2242,13 @@ completion_list_t *dequeue_completion(co
return cptr;
}
-static void process_sync_completion(
+static void process_sync_completion(zhandle_t *zh,
completion_list_t *cptr,
struct sync_completion *sc,
- struct iarchive *ia,
- zhandle_t *zh)
+ struct iarchive *ia)
{
- LOG_DEBUG(("Processing sync_completion with type=%d xid=%#x rc=%d",
- cptr->c.type, cptr->xid, sc->rc));
+ LOG_DEBUG(LOGCALLBACK(zh), "Processing sync_completion with type=%d xid=%#x rc=%d",
+ cptr->c.type, cptr->xid, sc->rc);
switch(cptr->c.type) {
case COMPLETION_DATA:
@@ -2334,15 +2352,15 @@ static void process_sync_completion(
case COMPLETION_VOID:
break;
case COMPLETION_MULTI:
- sc->rc = deserialize_multi(cptr->xid, cptr, ia);
+ sc->rc = deserialize_multi(zh, cptr->xid, cptr, ia);
break;
default:
- LOG_DEBUG(("Unsupported completion type=%d", cptr->c.type));
+ LOG_DEBUG(LOGCALLBACK(zh), "Unsupported completion type=%d", cptr->c.type);
break;
}
}
-static int deserialize_multi(int xid, completion_list_t *cptr, struct iarchive *ia)
+static int deserialize_multi(zhandle_t *zh, int xid, completion_list_t *cptr, struct iarchive *ia)
{
int rc = 0;
completion_head_t *clist = &cptr->c.clist;
@@ -2362,7 +2380,7 @@ static int deserialize_multi(int xid, co
}
}
- deserialize_response(entry->c.type, xid, mhdr.type == -1, mhdr.err, entry, ia);
+ deserialize_response(zh, entry->c.type, xid, mhdr.type == -1, mhdr.err, entry, ia);
deserialize_MultiHeader(ia, "multiheader", &mhdr);
//While deserializing the response we must destroy completion entry for each operation in
//the zoo_multi transaction. Otherwise this results in memory leak when client invokes zoo_multi
@@ -2373,12 +2391,12 @@ static int deserialize_multi(int xid, co
return rc;
}
-static void deserialize_response(int type, int xid, int failed, int rc, completion_list_t *cptr, struct iarchive *ia)
+static void deserialize_response(zhandle_t *zh, int type, int xid, int failed, int rc, completion_list_t *cptr, struct iarchive *ia)
{
switch (type) {
case COMPLETION_DATA:
- LOG_DEBUG(("Calling COMPLETION_DATA for xid=%#x failed=%d rc=%d",
- cptr->xid, failed, rc));
+ LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_DATA for xid=%#x failed=%d rc=%d",
+ cptr->xid, failed, rc);
if (failed) {
cptr->c.data_result(rc, 0, 0, 0, cptr->data);
} else {
@@ -2390,8 +2408,8 @@ static void deserialize_response(int typ
}
break;
case COMPLETION_STAT:
- LOG_DEBUG(("Calling COMPLETION_STAT for xid=%#x failed=%d rc=%d",
- cptr->xid, failed, rc));
+ LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_STAT for xid=%#x failed=%d rc=%d",
+ cptr->xid, failed, rc);
if (failed) {
cptr->c.stat_result(rc, 0, cptr->data);
} else {
@@ -2402,8 +2420,8 @@ static void deserialize_response(int typ
}
break;
case COMPLETION_STRINGLIST:
- LOG_DEBUG(("Calling COMPLETION_STRINGLIST for xid=%#x failed=%d rc=%d",
- cptr->xid, failed, rc));
+ LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_STRINGLIST for xid=%#x failed=%d rc=%d",
+ cptr->xid, failed, rc);
if (failed) {
cptr->c.strings_result(rc, 0, cptr->data);
} else {
@@ -2414,8 +2432,8 @@ static void deserialize_response(int typ
}
break;
case COMPLETION_STRINGLIST_STAT:
- LOG_DEBUG(("Calling COMPLETION_STRINGLIST_STAT for xid=%#x failed=%d rc=%d",
- cptr->xid, failed, rc));
+ LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_STRINGLIST_STAT for xid=%#x failed=%d rc=%d",
+ cptr->xid, failed, rc);
if (failed) {
cptr->c.strings_stat_result(rc, 0, 0, cptr->data);
} else {
@@ -2426,8 +2444,8 @@ static void deserialize_response(int typ
}
break;
case COMPLETION_STRING:
- LOG_DEBUG(("Calling COMPLETION_STRING for xid=%#x failed=%d, rc=%d",
- cptr->xid, failed, rc));
+ LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_STRING for xid=%#x failed=%d, rc=%d",
+ cptr->xid, failed, rc);
if (failed) {
cptr->c.string_result(rc, 0, cptr->data);
} else {
@@ -2438,8 +2456,8 @@ static void deserialize_response(int typ
}
break;
case COMPLETION_STRING_STAT:
- LOG_DEBUG(("Calling COMPLETION_STRING_STAT for xid=%#x failed=%d, rc=%d",
- cptr->xid, failed, rc));
+ LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_STRING_STAT for xid=%#x failed=%d, rc=%d",
+ cptr->xid, failed, rc);
if (failed) {
cptr->c.string_stat_result(rc, 0, 0, cptr->data);
} else {
@@ -2450,8 +2468,8 @@ static void deserialize_response(int typ
}
break;
case COMPLETION_ACLLIST:
- LOG_DEBUG(("Calling COMPLETION_ACLLIST for xid=%#x failed=%d rc=%d",
- cptr->xid, failed, rc));
+ LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_ACLLIST for xid=%#x failed=%d rc=%d",
+ cptr->xid, failed, rc);
if (failed) {
cptr->c.acl_result(rc, 0, 0, cptr->data);
} else {
@@ -2462,8 +2480,8 @@ static void deserialize_response(int typ
}
break;
case COMPLETION_VOID:
- LOG_DEBUG(("Calling COMPLETION_VOID for xid=%#x failed=%d rc=%d",
- cptr->xid, failed, rc));
+ LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_VOID for xid=%#x failed=%d rc=%d",
+ cptr->xid, failed, rc);
if (xid == PING_XID) {
// We want to skip the ping
} else {
@@ -2472,14 +2490,14 @@ static void deserialize_response(int typ
}
break;
case COMPLETION_MULTI:
- LOG_DEBUG(("Calling COMPLETION_MULTI for xid=%#x failed=%d rc=%d",
- cptr->xid, failed, rc));
- rc = deserialize_multi(xid, cptr, ia);
+ LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_MULTI for xid=%#x failed=%d rc=%d",
+ cptr->xid, failed, rc);
+ rc = deserialize_multi(zh, xid, cptr, ia);
assert(cptr->c.void_result);
cptr->c.void_result(rc, cptr->data);
break;
default:
- LOG_DEBUG(("Unsupported completion type=%d", cptr->c.type));
+ LOG_DEBUG(LOGCALLBACK(zh), "Unsupported completion type=%d", cptr->c.type);
}
}
@@ -2503,13 +2521,13 @@ void process_completions(zhandle_t *zh)
type = evt.type;
state = evt.state;
/* This is a notification so there aren't any pending requests */
- LOG_DEBUG(("Calling a watcher for node [%s], type = %d event=%s",
+ LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for node [%s], type = %d event=%s",
(evt.path==NULL?"NULL":evt.path), cptr->c.type,
- watcherEvent2String(type)));
+ watcherEvent2String(type));
deliverWatchers(zh,type,state,evt.path, &cptr->c.watcher_result);
deallocate_WatcherEvent(&evt);
} else {
- deserialize_response(cptr->c.type, hdr.xid, hdr.err != 0, hdr.err, cptr, ia);
+ deserialize_response(zh, cptr->c.type, hdr.xid, hdr.err != 0, hdr.err, cptr, ia);
}
destroy_completion_entry(cptr);
close_buffer_iarchive(&ia);
@@ -2552,7 +2570,7 @@ static void checkResponseLatency(zhandle
gettimeofday(&now,0);
delay=calculate_interval(&zh->socket_readable, &now);
if(delay>20)
- LOG_DEBUG(("The following server response has spent at least %dms sitting in the client socket recv buffer",delay));
+ LOG_DEBUG(LOGCALLBACK(zh), "The following server response has spent at least %dms sitting in the client socket recv buffer",delay);
zh->socket_readable.tv_sec=zh->socket_readable.tv_usec=0;
}
@@ -2591,13 +2609,13 @@ int zookeeper_process(zhandle_t *zh, int
char *path = NULL;
completion_list_t *c = NULL;
- LOG_DEBUG(("Processing WATCHER_EVENT"));
+ LOG_DEBUG(LOGCALLBACK(zh), "Processing WATCHER_EVENT");
deserialize_WatcherEvent(ia, "event", &evt);
type = evt.type;
path = evt.path;
/* We are doing a notification, so there is no pending request */
- c = create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0,0);
+ c = create_completion_entry(zh, WATCHER_EVENT_XID,-1,0,0,0,0);
c->buffer = bptr;
c->c.watcher_result = collectWatchers(zh, type, path);
@@ -2605,10 +2623,10 @@ int zookeeper_process(zhandle_t *zh, int
deallocate_WatcherEvent(&evt);
queue_completion(&zh->completions_to_process, c, 0);
} else if (hdr.xid == SET_WATCHES_XID) {
- LOG_DEBUG(("Processing SET_WATCHES"));
+ LOG_DEBUG(LOGCALLBACK(zh), "Processing SET_WATCHES");
free_buffer(bptr);
} else if (hdr.xid == AUTH_XID){
- LOG_DEBUG(("Processing AUTH_XID"));
+ LOG_DEBUG(LOGCALLBACK(zh), "Processing AUTH_XID");
/* special handling for the AUTH response as it may come back
* out-of-band */
@@ -2628,14 +2646,14 @@ int zookeeper_process(zhandle_t *zh, int
/* [ZOOKEEPER-804] Don't assert if zookeeper_close has been called. */
if (zh->close_requested == 1 && cptr == NULL) {
- LOG_DEBUG(("Completion queue has been cleared by zookeeper_close()"));
+ LOG_DEBUG(LOGCALLBACK(zh), "Completion queue has been cleared by zookeeper_close()");
close_buffer_iarchive(&ia);
return api_epilog(zh,ZINVALIDSTATE);
}
assert(cptr);
/* The requests are going to come back in order */
if (cptr->xid != hdr.xid) {
- LOG_DEBUG(("Processing unexpected or out-of-order response!"));
+ LOG_DEBUG(LOGCALLBACK(zh), "Processing unexpected or out-of-order response!");
// received unexpected (or out-of-order) response
close_buffer_iarchive(&ia);
@@ -2656,13 +2674,13 @@ int zookeeper_process(zhandle_t *zh, int
struct timeval now;
gettimeofday(&now, 0);
elapsed = calculate_interval(&zh->last_ping, &now);
- LOG_DEBUG(("Got ping response in %d ms", elapsed));
+ LOG_DEBUG(LOGCALLBACK(zh), "Got ping response in %d ms", elapsed);
// Nothing to do with a ping response
free_buffer(bptr);
destroy_completion_entry(cptr);
} else {
- LOG_DEBUG(("Queueing asynchronous response"));
+ LOG_DEBUG(LOGCALLBACK(zh), "Queueing asynchronous response");
cptr->buffer = bptr;
queue_completion(&zh->completions_to_process, cptr, 0);
@@ -2672,7 +2690,7 @@ int zookeeper_process(zhandle_t *zh, int
*sc = (struct sync_completion*)cptr->data;
sc->rc = rc;
- process_sync_completion(cptr, sc, ia, zh);
+ process_sync_completion(zh, cptr, sc, ia);
notify_sync_completion(sc);
free_buffer(bptr);
@@ -2716,12 +2734,12 @@ static void destroy_watcher_registration
}
}
-static completion_list_t* create_completion_entry(int xid, int completion_type,
+static completion_list_t* create_completion_entry(zhandle_t *zh, int xid, int completion_type,
const void *dc, const void *data,watcher_registration_t* wo, completion_head_t *clist)
{
completion_list_t *c = calloc(1,sizeof(completion_list_t));
if (!c) {
- LOG_ERROR(("out of memory"));
+ LOG_ERROR(LOGCALLBACK(zh), "out of memory");
return 0;
}
c->c.type = completion_type;
@@ -2808,7 +2826,7 @@ static int add_completion(zhandle_t *zh,
const void *dc, const void *data, int add_to_front,
watcher_registration_t* wo, completion_head_t *clist)
{
- completion_list_t *c =create_completion_entry(xid, completion_type, dc,
+ completion_list_t *c =create_completion_entry(zh, xid, completion_type, dc,
data, wo, clist);
int rc = 0;
if (!c)
@@ -2910,8 +2928,8 @@ int zookeeper_close(zhandle_t *zh)
if(zh->state==ZOO_CONNECTED_STATE){
struct oarchive *oa;
struct RequestHeader h = {get_xid(), ZOO_CLOSE_OP};
- LOG_INFO(("Closing zookeeper sessionId=%#llx to [%s]\n",
- zh->client_id.client_id,zoo_get_current_server(zh)));
+ LOG_INFO(LOGCALLBACK(zh), "Closing zookeeper sessionId=%#llx to [%s]\n",
+ zh->client_id.client_id,zoo_get_current_server(zh));
oa = create_buffer_oarchive();
rc = serialize_RequestHeader(oa, "header", &h);
rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
@@ -2927,8 +2945,8 @@ int zookeeper_close(zhandle_t *zh)
* (but reasonable) number of milliseconds since we want the call to block*/
rc=adaptor_send_queue(zh, 3000);
}else{
- LOG_INFO(("Freeing zookeeper resources for sessionId=%#llx\n",
- zh->client_id.client_id));
+ LOG_INFO(LOGCALLBACK(zh), "Freeing zookeeper resources for sessionId=%#llx\n",
+ zh->client_id.client_id);
rc = ZOK;
}
@@ -3060,8 +3078,8 @@ int zoo_awget(zhandle_t *zh, const char
/* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
- LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
- zoo_get_current_server(zh)));
+ LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
+ zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3104,8 +3122,8 @@ int zoo_awgetconfig(zhandle_t *zh, watch
/* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
- LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
- zoo_get_current_server(zh)));
+ LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
+ zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3141,7 +3159,7 @@ int zoo_areconfig(zhandle_t *zh, const c
/* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
- LOG_DEBUG(("Sending Reconfig request xid=%#x to %s",h.xid, zoo_get_current_server(zh)));
+ LOG_DEBUG(LOGCALLBACK(zh), "Sending Reconfig request xid=%#x to %s",h.xid, zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
@@ -3186,8 +3204,8 @@ int zoo_aset(zhandle_t *zh, const char *
/* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
- LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
- zoo_get_current_server(zh)));
+ LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
+ zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3266,8 +3284,8 @@ int zoo_acreate(zhandle_t *zh, const cha
/* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
- LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
- zoo_get_current_server(zh)));
+ LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
+ zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3297,8 +3315,8 @@ int zoo_acreate2(zhandle_t *zh, const ch
/* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
- LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
- zoo_get_current_server(zh)));
+ LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
+ zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3337,8 +3355,8 @@ int zoo_adelete(zhandle_t *zh, const cha
/* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
- LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
- zoo_get_current_server(zh)));
+ LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
+ zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3376,8 +3394,8 @@ int zoo_awexists(zhandle_t *zh, const ch
/* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
- LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
- zoo_get_current_server(zh)));
+ LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
+ zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3409,8 +3427,8 @@ static int zoo_awget_children_(zhandle_t
/* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
- LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
- zoo_get_current_server(zh)));
+ LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
+ zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3457,8 +3475,8 @@ static int zoo_awget_children2_(zhandle_
/* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
- LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
- zoo_get_current_server(zh)));
+ LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
+ zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3500,8 +3518,8 @@ int zoo_async(zhandle_t *zh, const char
/* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
- LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
- zoo_get_current_server(zh)));
+ LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
+ zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3530,8 +3548,8 @@ int zoo_aget_acl(zhandle_t *zh, const ch
/* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
- LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
- zoo_get_current_server(zh)));
+ LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
+ zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3561,8 +3579,8 @@ int zoo_aset_acl(zhandle_t *zh, const ch
/* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
- LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
- zoo_get_current_server(zh)));
+ LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
+ zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3655,7 +3673,7 @@ int zoo_amulti(zhandle_t *zh, int count,
result->valuelen = op->create_op.buflen;
enter_critical(zh);
- entry = create_completion_entry(h.xid, COMPLETION_STRING, op_result_string_completion, result, 0, 0);
+ entry = create_completion_entry(zh, h.xid, COMPLETION_STRING, op_result_string_completion, result, 0, 0);
leave_critical(zh);
free_duplicate_path(req.path, op->create_op.path);
break;
@@ -3667,7 +3685,7 @@ int zoo_amulti(zhandle_t *zh, int count,
rc = rc < 0 ? rc : serialize_DeleteRequest(oa, "req", &req);
enter_critical(zh);
- entry = create_completion_entry(h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0);
+ entry = create_completion_entry(zh, h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0);
leave_critical(zh);
free_duplicate_path(req.path, op->delete_op.path);
break;
@@ -3682,7 +3700,7 @@ int zoo_amulti(zhandle_t *zh, int count,
result->stat = op->set_op.stat;
enter_critical(zh);
- entry = create_completion_entry(h.xid, COMPLETION_STAT, op_result_stat_completion, result, 0, 0);
+ entry = create_completion_entry(zh, h.xid, COMPLETION_STAT, op_result_stat_completion, result, 0, 0);
leave_critical(zh);
free_duplicate_path(req.path, op->set_op.path);
break;
@@ -3695,14 +3713,14 @@ int zoo_amulti(zhandle_t *zh, int count,
rc = rc < 0 ? rc : serialize_CheckVersionRequest(oa, "req", &req);
enter_critical(zh);
- entry = create_completion_entry(h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0);
+ entry = create_completion_entry(zh, h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0);
leave_critical(zh);
free_duplicate_path(req.path, op->check_op.path);
break;
}
default:
- LOG_ERROR(("Unimplemented sub-op type=%d in multi-op", op->type));
+ LOG_ERROR(LOGCALLBACK(zh), "Unimplemented sub-op type=%d in multi-op", op->type);
return ZUNIMPLEMENTED;
}
@@ -3721,8 +3739,8 @@ int zoo_amulti(zhandle_t *zh, int count,
/* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
- LOG_DEBUG(("Sending multi request xid=%#x with %d subrequests to %s",
- h.xid, index, zoo_get_current_server(zh)));
+ LOG_DEBUG(LOGCALLBACK(zh), "Sending multi request xid=%#x with %d subrequests to %s",
+ h.xid, index, zoo_get_current_server(zh));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
@@ -4013,6 +4031,26 @@ static const char* format_endpoint_info(
return buf;
}
+log_callback_fn zoo_get_log_callback(const zhandle_t* zh)
+{
+ // Verify we have a valid handle
+ if (zh == NULL) {
+ return NULL;
+ }
+
+ return zh->log_callback;
+}
+
+void zoo_set_log_callback(zhandle_t *zh, log_callback_fn callback)
+{
+ // Verify we have a valid handle
+ if (zh == NULL) {
+ return;
+ }
+
+ zh->log_callback = callback;
+}
+
void zoo_deterministic_conn_order(int yesOrNo)
{
disable_conn_permute=yesOrNo;
Modified: zookeeper/trunk/src/c/tests/PthreadMocks.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/PthreadMocks.h?rev=1484357&r1=1484356&r2=1484357&view=diff
==============================================================================
--- zookeeper/trunk/src/c/tests/PthreadMocks.h (original)
+++ zookeeper/trunk/src/c/tests/PthreadMocks.h Sun May 19 22:08:08 2013
@@ -381,18 +381,18 @@ public:
int ret=LIBC_SYMBOLS.pthread_create(t,a,threadFuncWrapper,
new ThreadContext(f,d));
if(verbose)
- TEST_TRACE(("thread created %p",*t));
+ TEST_TRACE("thread created %p",*t);
return ret;
}
virtual int pthread_join(pthread_t t, void ** r){
- if(verbose) TEST_TRACE(("thread joined %p",t));
+ if(verbose) TEST_TRACE("thread joined %p",t);
int ret=LIBC_SYMBOLS.pthread_join(t,r);
if(ret==0)
markDestroyed(t);
return ret;
}
virtual int pthread_detach(pthread_t t){
- if(verbose) TEST_TRACE(("thread detached %p",t));
+ if(verbose) TEST_TRACE("thread detached %p",t);
int ret=LIBC_SYMBOLS.pthread_detach(t);
if(ret==0)
markDestroyed(t);
Modified: zookeeper/trunk/src/c/tests/TestClient.cc
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/TestClient.cc?rev=1484357&r1=1484356&r2=1484357&view=diff
==============================================================================
--- zookeeper/trunk/src/c/tests/TestClient.cc (original)
+++ zookeeper/trunk/src/c/tests/TestClient.cc Sun May 19 22:08:08 2013
@@ -46,6 +46,13 @@ struct buff_struct_2 {
char *buffer;
};
+// For testing LogMessage Callback functionality
+list<string> logMessages;
+void logMessageHandler(const char* message) {
+ cout << "Log Message Received: [" << message << "]" << endl;
+ logMessages.push_back(message);
+}
+
static int Stat_eq(struct Stat* a, struct Stat* b)
{
if (a->czxid != b->czxid) return 0;
@@ -172,6 +179,7 @@ public:
}
return connected;
}
+
bool waitForDisconnected(zhandle_t *zh) {
time_t expires = time(0) + 15;
while(connected && time(0) < expires) {
@@ -179,11 +187,15 @@ public:
}
return !connected;
}
+
} watchctx_t;
class Zookeeper_simpleSystem : public CPPUNIT_NS::TestFixture
{
CPPUNIT_TEST_SUITE(Zookeeper_simpleSystem);
+ CPPUNIT_TEST(testLogCallbackSet);
+ CPPUNIT_TEST(testLogCallbackInit);
+ CPPUNIT_TEST(testLogCallbackClear);
CPPUNIT_TEST(testAsyncWatcherAutoReset);
CPPUNIT_TEST(testDeserializeString);
#ifdef THREADED
@@ -231,6 +243,13 @@ class Zookeeper_simpleSystem : public CP
return createClient(hostPorts, ctx);
}
+ zhandle_t *createClient(watchctx_t *ctx, log_callback_fn logCallback) {
+ zhandle_t *zk = zookeeper_init2(hostPorts, watcher, 10000, 0, ctx, 0, logCallback);
+ ctx->zh = zk;
+ sleep(1);
+ return zk;
+ }
+
zhandle_t *createClient(const char *hp, watchctx_t *ctx) {
zhandle_t *zk = zookeeper_init(hp, watcher, 10000, 0, ctx, 0);
ctx->zh = zk;
@@ -265,7 +284,6 @@ public:
zoo_set_log_stream(logfile);
}
-
void startServer() {
char cmd[1024];
sprintf(cmd, "%s start %s", ZKSERVER_CMD, getHostPorts());
@@ -907,6 +925,69 @@ public:
CPPUNIT_ASSERT_EQUAL(string(path), string(path_buffer));
}
+ // Test creating normal handle via zookeeper_init then explicitly setting callback
+ void testLogCallbackSet()
+ {
+ watchctx_t ctx;
+ CPPUNIT_ASSERT(logMessages.empty());
+ zhandle_t *zk = createClient(&ctx);
+
+ zoo_set_log_callback(zk, &logMessageHandler);
+ CPPUNIT_ASSERT_EQUAL(zoo_get_log_callback(zk), &logMessageHandler);
+
+ // Log 10 messages and ensure all go to callback
+ int expected = 10;
+ for (int i = 0; i < expected; i++)
+ {
+ LOG_INFO(LOGCALLBACK(zk), "%s #%d", __FUNCTION__, i);
+ }
+ CPPUNIT_ASSERT(expected == logMessages.size());
+ }
+
+ // Test creating handle via zookeeper_init2 to ensure all connection messages go to callback
+ void testLogCallbackInit()
+ {
+ logMessages.clear();
+ watchctx_t ctx;
+ zhandle_t *zk = createClient(&ctx, &logMessageHandler);
+ CPPUNIT_ASSERT_EQUAL(zoo_get_log_callback(zk), &logMessageHandler);
+
+ // All the connection messages should have gone to the callback -- don't
+ // want this to be a maintenance issue so we're not asserting exact count
+ int numBefore = logMessages.size();
+ CPPUNIT_ASSERT(numBefore != 0);
+
+ // Log 10 messages and ensure all go to callback
+ int expected = 10;
+ for (int i = 0; i < expected; i++)
+ {
+ LOG_INFO(LOGCALLBACK(zk), "%s #%d", __FUNCTION__, i);
+ }
+ CPPUNIT_ASSERT(logMessages.size() == numBefore + expected);
+ }
+
+ // Test clearing log callback -- logging should resume going to logstream
+ void testLogCallbackClear()
+ {
+ logMessages.clear();
+ watchctx_t ctx;
+ zhandle_t *zk = createClient(&ctx, &logMessageHandler);
+ CPPUNIT_ASSERT_EQUAL(zoo_get_log_callback(zk), &logMessageHandler);
+
+ // All the connection messages should have gone to the callback -- again, we don't
+ // want this to be a maintenance issue so we're not asserting exact count
+ int numBefore = logMessages.size();
+ CPPUNIT_ASSERT(numBefore > 0);
+
+ // Clear log_callback
+ zoo_set_log_callback(zk, NULL);
+
+ // Future log messages should go to logstream not callback
+ LOG_INFO(LOGCALLBACK(zk), __FUNCTION__);
+ int numAfter = logMessages.size();
+ CPPUNIT_ASSERT_EQUAL(numBefore, numAfter);
+ }
+
void testAsyncWatcherAutoReset()
{
watchctx_t ctx;
Modified: zookeeper/trunk/src/c/tests/TestOperations.cc
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/TestOperations.cc?rev=1484357&r1=1484356&r2=1484357&view=diff
==============================================================================
--- zookeeper/trunk/src/c/tests/TestOperations.cc (original)
+++ zookeeper/trunk/src/c/tests/TestOperations.cc Sun May 19 22:08:08 2013
@@ -490,7 +490,7 @@ public:
break;
}
}
- //TEST_TRACE(("Finished %d iterations",i));
+ //TEST_TRACE("Finished %d iterations",i);
}
virtual void validate(const char* file, int line) const{
CPPUNIT_ASSERT_EQUAL_MESSAGE_LOC("ZOK != rc",(int)ZOK,rc_,file,line);
@@ -525,7 +525,7 @@ public:
zookeeper_close(lzh);
for(int counter=0; counter<200; counter++){
- TEST_TRACE(("Loop count %d",counter));
+ TEST_TRACE("Loop count %d",counter);
CloseFinally guard(&zh);
@@ -539,7 +539,7 @@ public:
jmgr.startJobsImmediately();
jmgr.wait();
VALIDATE_JOBS(jmgr);
- TEST_TRACE(("run %d finished",counter));
+ TEST_TRACE("run %d finished",counter);
}
}
@@ -564,7 +564,7 @@ public:
void testOperationsAndDisconnectConcurrently1()
{
for(int counter=0; counter<50; counter++){
- //TEST_TRACE(("Loop count %d",counter));
+ //TEST_TRACE("Loop count %d",counter);
// frozen time -- no timeouts and no pings
Mock_gettimeofday timeMock;
Modified: zookeeper/trunk/src/c/tests/Util.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/Util.h?rev=1484357&r1=1484356&r2=1484357&view=diff
==============================================================================
--- zookeeper/trunk/src/c/tests/Util.h (original)
+++ zookeeper/trunk/src/c/tests/Util.h Sun May 19 22:08:08 2013
@@ -36,8 +36,8 @@
__real_##sym params
// must include "src/zookeeper_log.h" to be able to use this macro
-#define TEST_TRACE(x) \
- log_message(ZOO_LOG_LEVEL_DEBUG,__LINE__,__func__,format_log_message x)
+#define TEST_TRACE(x...) \
+ log_message(LOGSTREAM, ZOO_LOG_LEVEL_DEBUG,__LINE__,__func__,x)
extern const std::string EMPTY_STRING;