You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2016/12/16 10:45:05 UTC
zookeeper git commit: ZOOKEEPER-761: Remove *synchronous* calls from
the *single-threaded* C client API [Forced Update!]
Repository: zookeeper
Updated Branches:
refs/heads/master c0868332a -> 482ce218d (forced update)
ZOOKEEPER-761: Remove *synchronous* calls from the *single-threaded* C client API
JIRA: https://issues.apache.org/jira/browse/ZOOKEEPER-761
Author: breed <br...@apache.org>
Reviewers: fpj <fp...@apache.org>, hanm <ha...@cloudera.com>, rgs <rg...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/482ce218
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/482ce218
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/482ce218
Branch: refs/heads/master
Commit: 482ce218d815b7a613ed9728b82602ddecc61996
Parents: cd0e323
Author: fpj <fp...@apache.org>
Authored: Fri Dec 16 09:27:25 2016 +0000
Committer: fpj <fp...@apache.org>
Committed: Fri Dec 16 10:43:47 2016 +0000
----------------------------------------------------------------------
src/c/include/zookeeper.h | 70 +++---
src/c/src/cli.c | 15 +-
src/c/src/st_adaptor.c | 18 --
src/c/src/zk_adaptor.h | 2 +
src/c/src/zookeeper.c | 419 ++++++++++++++++++---------------
src/c/tests/TestClient.cc | 5 +
src/c/tests/TestMulti.cc | 4 +-
src/c/tests/TestReadOnlyClient.cc | 4 +-
src/c/tests/TestReconfigServer.cc | 3 +-
9 files changed, 276 insertions(+), 264 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/482ce218/src/c/include/zookeeper.h
----------------------------------------------------------------------
diff --git a/src/c/include/zookeeper.h b/src/c/include/zookeeper.h
index b92acbb..973daa4 100644
--- a/src/c/include/zookeeper.h
+++ b/src/c/include/zookeeper.h
@@ -1505,6 +1505,41 @@ ZOOAPI void zoo_set_log_callback(zhandle_t *zh, log_callback_fn callback);
ZOOAPI void zoo_deterministic_conn_order(int yesOrNo);
/**
+ * Type of watchers: used to select which type of watchers should be removed
+ */
+typedef enum {
+ ZWATCHERTYPE_CHILDREN = 1,
+ ZWATCHERTYPE_DATA = 2,
+ ZWATCHERTYPE_ANY = 3
+} ZooWatcherType;
+
+/**
+ * \brief removes the watchers for the given path and watcher type.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the path for which watchers will be removed
+ * \param wtype the watcher type to be removed
+ * \param watcher the watcher to be removed, if null all watchers for that
+ * path (and watcher type) will be removed
+ * \param watcherCtx the contex associated with the watcher to be removed
+ * \param local whether the watchers will be removed locally even if there is
+ * no server connection
+ * \return the return code for the function call.
+ * ZOK - operation completed successfully
+ * ZNOWATCHER - the watcher couldn't be found.
+ * ZINVALIDSTATE - if !local, zhandle state is either ZOO_SESSION_EXPIRED_STATE
+ * or ZOO_AUTH_FAILED_STATE
+ * ZBADARGUMENTS - invalid input parameters
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ * ZSYSTEMERROR - a system error occured
+ */
+ZOOAPI int zoo_aremove_watchers(zhandle_t *zh, const char *path,
+ ZooWatcherType wtype, watcher_fn watcher, void *watcherCtx, int local,
+ void_completion_t *completion, const void *data);
+
+
+#ifdef THREADED
+/**
* \brief create a node synchronously.
*
* This method will create a node in ZooKeeper. A node can only be created if
@@ -2002,15 +2037,6 @@ ZOOAPI int zoo_set_acl(zhandle_t *zh, const char *path, int version,
ZOOAPI int zoo_multi(zhandle_t *zh, int count, const zoo_op_t *ops, zoo_op_result_t *results);
/**
- * Type of watchers: used to select which type of watchers should be removed
- */
-typedef enum {
- ZWATCHERTYPE_CHILDREN = 1,
- ZWATCHERTYPE_DATA = 2,
- ZWATCHERTYPE_ANY = 3
-} ZooWatcherType;
-
-/**
* \brief removes the watchers for the given path and watcher type.
*
* \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
@@ -2032,31 +2058,7 @@ typedef enum {
*/
ZOOAPI int zoo_remove_watchers(zhandle_t *zh, const char *path,
ZooWatcherType wtype, watcher_fn watcher, void *watcherCtx, int local);
-
-/**
- * \brief removes the watchers for the given path and watcher type.
- *
- * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
- * \param path the path for which watchers will be removed
- * \param wtype the watcher type to be removed
- * \param watcher the watcher to be removed, if null all watchers for that
- * path (and watcher type) will be removed
- * \param watcherCtx the contex associated with the watcher to be removed
- * \param local whether the watchers will be removed locally even if there is
- * no server connection
- * \return the return code for the function call.
- * ZOK - operation completed successfully
- * ZNOWATCHER - the watcher couldn't be found.
- * ZINVALIDSTATE - if !local, zhandle state is either ZOO_SESSION_EXPIRED_STATE
- * or ZOO_AUTH_FAILED_STATE
- * ZBADARGUMENTS - invalid input parameters
- * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
- * ZSYSTEMERROR - a system error occured
- */
-ZOOAPI int zoo_aremove_watchers(zhandle_t *zh, const char *path,
- ZooWatcherType wtype, watcher_fn watcher, void *watcherCtx, int local,
- void_completion_t *completion, const void *data);
-
+#endif
#ifdef __cplusplus
}
#endif
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/482ce218/src/c/src/cli.c
----------------------------------------------------------------------
diff --git a/src/c/src/cli.c b/src/c/src/cli.c
index 0f451cf..6ca4a41 100644
--- a/src/c/src/cli.c
+++ b/src/c/src/cli.c
@@ -504,13 +504,8 @@ void processline(char *line) {
}
*ptr = '\0';
ptr++;
- if (async) {
- rc = zoo_aset(zh, line, ptr, strlen(ptr), -1, my_stat_completion,
- strdup(line));
- } else {
- struct Stat stat;
- rc = zoo_set2(zh, line, ptr, strlen(ptr), -1, &stat);
- }
+ rc = zoo_aset(zh, line, ptr, strlen(ptr), -1, my_stat_completion,
+ strdup(line));
if (rc) {
fprintf(stderr, "Error %d for %s\n", rc, line);
}
@@ -579,11 +574,7 @@ void processline(char *line) {
fprintf(stderr, "Path must start with /, found: %s\n", line);
return;
}
- if (async) {
- rc = zoo_adelete(zh, line, -1, my_void_completion, strdup(line));
- } else {
- rc = zoo_delete(zh, line, -1);
- }
+ rc = zoo_adelete(zh, line, -1, my_void_completion, strdup(line));
if (rc) {
fprintf(stderr, "Error %d for %s\n", rc, line);
}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/482ce218/src/c/src/st_adaptor.c
----------------------------------------------------------------------
diff --git a/src/c/src/st_adaptor.c b/src/c/src/st_adaptor.c
index 0f62966..5e9a4ff 100644
--- a/src/c/src/st_adaptor.c
+++ b/src/c/src/st_adaptor.c
@@ -48,24 +48,6 @@ int unlock_completion_list(completion_head_t *l)
{
return 0;
}
-struct sync_completion *alloc_sync_completion(void)
-{
- return (struct sync_completion*)calloc(1, sizeof(struct sync_completion));
-}
-int wait_sync_completion(struct sync_completion *sc)
-{
- return 0;
-}
-
-void free_sync_completion(struct sync_completion *sc)
-{
- free(sc);
-}
-
-void notify_sync_completion(struct sync_completion *sc)
-{
-}
-
int process_async(int outstanding_sync)
{
return outstanding_sync == 0;
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/482ce218/src/c/src/zk_adaptor.h
----------------------------------------------------------------------
diff --git a/src/c/src/zk_adaptor.h b/src/c/src/zk_adaptor.h
index 74ca4c0..97995e3 100644
--- a/src/c/src/zk_adaptor.h
+++ b/src/c/src/zk_adaptor.h
@@ -274,10 +274,12 @@ struct _zhandle {
int adaptor_init(zhandle_t *zh);
void adaptor_finish(zhandle_t *zh);
void adaptor_destroy(zhandle_t *zh);
+#if THREADED
struct sync_completion *alloc_sync_completion(void);
int wait_sync_completion(struct sync_completion *sc);
void free_sync_completion(struct sync_completion *sc);
void notify_sync_completion(struct sync_completion *sc);
+#endif
int adaptor_send_queue(zhandle_t *zh, int timeout);
int process_async(int outstanding_sync);
void process_completions(zhandle_t *zh);
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/482ce218/src/c/src/zookeeper.c
----------------------------------------------------------------------
diff --git a/src/c/src/zookeeper.c b/src/c/src/zookeeper.c
index d36b05a..f6d09b7 100644
--- a/src/c/src/zookeeper.c
+++ b/src/c/src/zookeeper.c
@@ -234,6 +234,13 @@ static __attribute__((unused)) void print_completion_queue(zhandle_t *zh);
static void *SYNCHRONOUS_MARKER = (void*)&SYNCHRONOUS_MARKER;
static int isValidPath(const char* path, const int flags);
+#ifdef THREADED
+static void process_sync_completion(zhandle_t *zh,
+ completion_list_t *cptr,
+ struct sync_completion *sc,
+ struct iarchive *ia);
+#endif
+
#ifdef _WIN32
typedef SOCKET socket_t;
typedef int sendsize_t;
@@ -258,6 +265,15 @@ static void zookeeper_set_sock_timeout(zhandle_t *, socket_t, int);
static socket_t zookeeper_connect(zhandle_t *, struct sockaddr_storage *, socket_t);
+/*
+ * abort due to the use of a sync api in a singlethreaded environment
+ */
+static void abort_singlethreaded(zhandle_t *zh)
+{
+ LOG_ERROR(LOGCALLBACK(zh), "Sync completion used without threads");
+ abort();
+}
+
static sendsize_t zookeeper_send(socket_t s, const void* buf, size_t len)
{
return send(s, buf, len, SEND_FLAGS);
@@ -1626,12 +1642,16 @@ void free_completions(zhandle_t *zh,int callCompletion,int reason)
tmp_list.head = cptr->next;
if (cptr->c.data_result == SYNCHRONOUS_MARKER) {
+#ifdef THREADED
struct sync_completion
*sc = (struct sync_completion*)cptr->data;
sc->rc = reason;
notify_sync_completion(sc);
zh->outstanding_sync--;
destroy_completion_entry(cptr);
+#else
+ abort_singlethreaded(zh);
+#endif
} else if (callCompletion) {
// Fake the response
buffer_list_t *bptr;
@@ -2530,124 +2550,6 @@ completion_list_t *dequeue_completion(completion_head_t *list)
return cptr;
}
-static void process_sync_completion(zhandle_t *zh,
- completion_list_t *cptr,
- struct sync_completion *sc,
- struct iarchive *ia)
-{
- LOG_DEBUG(LOGCALLBACK(zh), "Processing sync_completion with type=%d xid=%#x rc=%d",
- cptr->c.type, cptr->xid, sc->rc);
-
- switch(cptr->c.type) {
- case COMPLETION_DATA:
- if (sc->rc==0) {
- struct GetDataResponse res;
- int len;
- deserialize_GetDataResponse(ia, "reply", &res);
- if (res.data.len <= sc->u.data.buff_len) {
- len = res.data.len;
- } else {
- len = sc->u.data.buff_len;
- }
- sc->u.data.buff_len = len;
- // check if len is negative
- // just of NULL which is -1 int
- if (len == -1) {
- sc->u.data.buffer = NULL;
- } else {
- memcpy(sc->u.data.buffer, res.data.buff, len);
- }
- sc->u.data.stat = res.stat;
- deallocate_GetDataResponse(&res);
- }
- break;
- case COMPLETION_STAT:
- if (sc->rc==0) {
- struct SetDataResponse res;
- deserialize_SetDataResponse(ia, "reply", &res);
- sc->u.stat = res.stat;
- deallocate_SetDataResponse(&res);
- }
- break;
- case COMPLETION_STRINGLIST:
- if (sc->rc==0) {
- struct GetChildrenResponse res;
- deserialize_GetChildrenResponse(ia, "reply", &res);
- sc->u.strs2 = res.children;
- /* We don't deallocate since we are passing it back */
- // deallocate_GetChildrenResponse(&res);
- }
- break;
- case COMPLETION_STRINGLIST_STAT:
- if (sc->rc==0) {
- struct GetChildren2Response res;
- deserialize_GetChildren2Response(ia, "reply", &res);
- sc->u.strs_stat.strs2 = res.children;
- sc->u.strs_stat.stat2 = res.stat;
- /* We don't deallocate since we are passing it back */
- // deallocate_GetChildren2Response(&res);
- }
- break;
- case COMPLETION_STRING:
- if (sc->rc==0) {
- struct CreateResponse res;
- int len;
- const char * client_path;
- deserialize_CreateResponse(ia, "reply", &res);
- //ZOOKEEPER-1027
- client_path = sub_string(zh, res.path);
- len = strlen(client_path) + 1;if (len > sc->u.str.str_len) {
- len = sc->u.str.str_len;
- }
- if (len > 0) {
- memcpy(sc->u.str.str, client_path, len - 1);
- sc->u.str.str[len - 1] = '\0';
- }
- free_duplicate_path(client_path, res.path);
- deallocate_CreateResponse(&res);
- }
- break;
- case COMPLETION_STRING_STAT:
- if (sc->rc==0) {
- struct Create2Response res;
- int len;
- const char * client_path;
- deserialize_Create2Response(ia, "reply", &res);
- client_path = sub_string(zh, res.path);
- len = strlen(client_path) + 1;
- if (len > sc->u.str.str_len) {
- len = sc->u.str.str_len;
- }
- if (len > 0) {
- memcpy(sc->u.str.str, client_path, len - 1);
- sc->u.str.str[len - 1] = '\0';
- }
- free_duplicate_path(client_path, res.path);
- sc->u.stat = res.stat;
- deallocate_Create2Response(&res);
- }
- break;
- case COMPLETION_ACLLIST:
- if (sc->rc==0) {
- struct GetACLResponse res;
- deserialize_GetACLResponse(ia, "reply", &res);
- sc->u.acl.acl = res.acl;
- sc->u.acl.stat = res.stat;
- /* We don't deallocate since we are passing it back */
- //deallocate_GetACLResponse(&res);
- }
- break;
- case COMPLETION_VOID:
- break;
- case COMPLETION_MULTI:
- sc->rc = deserialize_multi(zh, cptr->xid, cptr, ia);
- break;
- default:
- LOG_DEBUG(LOGCALLBACK(zh), "Unsupported completion type=%d", cptr->c.type);
- break;
- }
-}
-
static int deserialize_multi(zhandle_t *zh, int xid, completion_list_t *cptr, struct iarchive *ia)
{
int rc = 0;
@@ -2967,6 +2869,7 @@ int zookeeper_process(zhandle_t *zh, int events)
cptr->buffer = bptr;
queue_completion(&zh->completions_to_process, cptr, 0);
} else {
+#ifdef THREADED
struct sync_completion
*sc = (struct sync_completion*)cptr->data;
sc->rc = rc;
@@ -2977,6 +2880,9 @@ int zookeeper_process(zhandle_t *zh, int events)
free_buffer(bptr);
zh->outstanding_sync--;
destroy_completion_entry(cptr);
+#else
+ abort_singlethreaded(zh);
+#endif
}
}
@@ -4062,6 +3968,76 @@ int zoo_amulti(zhandle_t *zh, int count, const zoo_op_t *ops,
return (rc < 0) ? ZMARSHALLINGERROR : ZOK;
}
+
+int zoo_aremove_watchers(zhandle_t *zh, const char *path, ZooWatcherType wtype,
+ watcher_fn watcher, void *watcherCtx, int local,
+ void_completion_t *completion, const void *data)
+{
+ char *server_path = prepend_string(zh, path);
+ int rc;
+ struct oarchive *oa;
+ struct RequestHeader h = { get_xid(), ZOO_REMOVE_WATCHES };
+ struct RemoveWatchesRequest req = { (char*)server_path, wtype };
+ watcher_deregistration_t *wdo;
+
+ if (!zh || !isValidPath(server_path, 0)) {
+ rc = ZBADARGUMENTS;
+ goto done;
+ }
+
+ if (!local && is_unrecoverable(zh)) {
+ rc = ZINVALIDSTATE;
+ goto done;
+ }
+
+ if (!pathHasWatcher(zh, server_path, wtype, watcher, watcherCtx)) {
+ rc = ZNOWATCHER;
+ goto done;
+ }
+
+ if (local) {
+ removeWatchers(zh, server_path, wtype, watcher, watcherCtx);
+#ifdef THREADED
+ notify_sync_completion((struct sync_completion *)data);
+#endif
+ rc = ZOK;
+ goto done;
+ }
+
+ oa = create_buffer_oarchive();
+ rc = serialize_RequestHeader(oa, "header", &h);
+ rc = rc < 0 ? rc : serialize_RemoveWatchesRequest(oa, "req", &req);
+ if (rc < 0) {
+ goto done;
+ }
+
+ wdo = create_watcher_deregistration(server_path, watcher, watcherCtx,
+ wtype);
+ if (!wdo) {
+ rc = ZSYSTEMERROR;
+ goto done;
+ }
+
+ enter_critical(zh);
+ rc = add_completion_deregistration(zh, h.xid, COMPLETION_VOID,
+ completion, data, 0, wdo, 0);
+ rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
+ get_buffer_len(oa));
+ rc = rc < 0 ? ZMARSHALLINGERROR : ZOK;
+ leave_critical(zh);
+
+ /* We queued the buffer, so don't free it */
+ close_buffer_oarchive(&oa, 0);
+
+ LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",
+ h.xid, path, zoo_get_current_server(zh));
+
+ adaptor_send_queue(zh, 0);
+
+done:
+ free_duplicate_path(server_path, path);
+ return rc;
+}
void zoo_create_op_init(zoo_op_t *op, const char *path, const char *value,
int valuelen, const struct ACL_vector *acl, int flags,
char *path_buffer, int path_buffer_len)
@@ -4120,25 +4096,6 @@ void zoo_check_op_init(zoo_op_t *op, const char *path, int version)
op->check_op.version = version;
}
-int zoo_multi(zhandle_t *zh, int count, const zoo_op_t *ops, zoo_op_result_t *results)
-{
- int rc;
-
- struct sync_completion *sc = alloc_sync_completion();
- if (!sc) {
- return ZSYSTEMERROR;
- }
-
- rc = zoo_amulti(zh, count, ops, results, SYNCHRONOUS_MARKER, sc);
- if (rc == ZOK) {
- wait_sync_completion(sc);
- rc = sc->rc;
- }
- free_sync_completion(sc);
-
- return rc;
-}
-
/* specify timeout of 0 to make the function non-blocking */
/* timeout is in milliseconds */
int flush_send_queue(zhandle_t*zh, int timeout)
@@ -4375,6 +4332,126 @@ void zoo_deterministic_conn_order(int yesOrNo)
disable_conn_permute=yesOrNo;
}
+#ifdef THREADED
+
+static void process_sync_completion(zhandle_t *zh,
+ completion_list_t *cptr,
+ struct sync_completion *sc,
+ struct iarchive *ia)
+{
+ LOG_DEBUG(LOGCALLBACK(zh), "Processing sync_completion with type=%d xid=%#x rc=%d",
+ cptr->c.type, cptr->xid, sc->rc);
+
+ switch(cptr->c.type) {
+ case COMPLETION_DATA:
+ if (sc->rc==0) {
+ struct GetDataResponse res;
+ int len;
+ deserialize_GetDataResponse(ia, "reply", &res);
+ if (res.data.len <= sc->u.data.buff_len) {
+ len = res.data.len;
+ } else {
+ len = sc->u.data.buff_len;
+ }
+ sc->u.data.buff_len = len;
+ // check if len is negative
+ // just of NULL which is -1 int
+ if (len == -1) {
+ sc->u.data.buffer = NULL;
+ } else {
+ memcpy(sc->u.data.buffer, res.data.buff, len);
+ }
+ sc->u.data.stat = res.stat;
+ deallocate_GetDataResponse(&res);
+ }
+ break;
+ case COMPLETION_STAT:
+ if (sc->rc==0) {
+ struct SetDataResponse res;
+ deserialize_SetDataResponse(ia, "reply", &res);
+ sc->u.stat = res.stat;
+ deallocate_SetDataResponse(&res);
+ }
+ break;
+ case COMPLETION_STRINGLIST:
+ if (sc->rc==0) {
+ struct GetChildrenResponse res;
+ deserialize_GetChildrenResponse(ia, "reply", &res);
+ sc->u.strs2 = res.children;
+ /* We don't deallocate since we are passing it back */
+ // deallocate_GetChildrenResponse(&res);
+ }
+ break;
+ case COMPLETION_STRINGLIST_STAT:
+ if (sc->rc==0) {
+ struct GetChildren2Response res;
+ deserialize_GetChildren2Response(ia, "reply", &res);
+ sc->u.strs_stat.strs2 = res.children;
+ sc->u.strs_stat.stat2 = res.stat;
+ /* We don't deallocate since we are passing it back */
+ // deallocate_GetChildren2Response(&res);
+ }
+ break;
+ case COMPLETION_STRING:
+ if (sc->rc==0) {
+ struct CreateResponse res;
+ int len;
+ const char * client_path;
+ deserialize_CreateResponse(ia, "reply", &res);
+ //ZOOKEEPER-1027
+ client_path = sub_string(zh, res.path);
+ len = strlen(client_path) + 1;if (len > sc->u.str.str_len) {
+ len = sc->u.str.str_len;
+ }
+ if (len > 0) {
+ memcpy(sc->u.str.str, client_path, len - 1);
+ sc->u.str.str[len - 1] = '\0';
+ }
+ free_duplicate_path(client_path, res.path);
+ deallocate_CreateResponse(&res);
+ }
+ break;
+ case COMPLETION_STRING_STAT:
+ if (sc->rc==0) {
+ struct Create2Response res;
+ int len;
+ const char * client_path;
+ deserialize_Create2Response(ia, "reply", &res);
+ client_path = sub_string(zh, res.path);
+ len = strlen(client_path) + 1;
+ if (len > sc->u.str.str_len) {
+ len = sc->u.str.str_len;
+ }
+ if (len > 0) {
+ memcpy(sc->u.str.str, client_path, len - 1);
+ sc->u.str.str[len - 1] = '\0';
+ }
+ free_duplicate_path(client_path, res.path);
+ sc->u.stat = res.stat;
+ deallocate_Create2Response(&res);
+ }
+ break;
+ case COMPLETION_ACLLIST:
+ if (sc->rc==0) {
+ struct GetACLResponse res;
+ deserialize_GetACLResponse(ia, "reply", &res);
+ sc->u.acl.acl = res.acl;
+ sc->u.acl.stat = res.stat;
+ /* We don't deallocate since we are passing it back */
+ //deallocate_GetACLResponse(&res);
+ }
+ break;
+ case COMPLETION_VOID:
+ break;
+ case COMPLETION_MULTI:
+ sc->rc = deserialize_multi(zh, cptr->xid, cptr, ia);
+ break;
+ default:
+ LOG_DEBUG(LOGCALLBACK(zh), "Unsupported completion type=%d", cptr->c.type);
+ break;
+ }
+}
+
/*---------------------------------------------------------------------------*
* SYNC API
*---------------------------------------------------------------------------*/
@@ -4712,70 +4789,22 @@ int zoo_remove_watchers(zhandle_t *zh, const char *path, ZooWatcherType wtype,
return rc;
}
-int zoo_aremove_watchers(zhandle_t *zh, const char *path, ZooWatcherType wtype,
- watcher_fn watcher, void *watcherCtx, int local,
- void_completion_t *completion, const void *data)
+int zoo_multi(zhandle_t *zh, int count, const zoo_op_t *ops, zoo_op_result_t *results)
{
- char *server_path = prepend_string(zh, path);
int rc;
- struct oarchive *oa;
- struct RequestHeader h = { get_xid(), ZOO_REMOVE_WATCHES };
- struct RemoveWatchesRequest req = { (char*)server_path, wtype };
- watcher_deregistration_t *wdo;
-
- if (!zh || !isValidPath(server_path, 0)) {
- rc = ZBADARGUMENTS;
- goto done;
- }
- if (!local && is_unrecoverable(zh)) {
- rc = ZINVALIDSTATE;
- goto done;
- }
-
- if (!pathHasWatcher(zh, server_path, wtype, watcher, watcherCtx)) {
- rc = ZNOWATCHER;
- goto done;
- }
-
- if (local) {
- removeWatchers(zh, server_path, wtype, watcher, watcherCtx);
- notify_sync_completion((struct sync_completion *)data);
- rc = ZOK;
- goto done;
- }
-
- oa = create_buffer_oarchive();
- rc = serialize_RequestHeader(oa, "header", &h);
- rc = rc < 0 ? rc : serialize_RemoveWatchesRequest(oa, "req", &req);
- if (rc < 0) {
- goto done;
+ struct sync_completion *sc = alloc_sync_completion();
+ if (!sc) {
+ return ZSYSTEMERROR;
}
- wdo = create_watcher_deregistration(server_path, watcher, watcherCtx,
- wtype);
- if (!wdo) {
- rc = ZSYSTEMERROR;
- goto done;
+ rc = zoo_amulti(zh, count, ops, results, SYNCHRONOUS_MARKER, sc);
+ if (rc == ZOK) {
+ wait_sync_completion(sc);
+ rc = sc->rc;
}
+ free_sync_completion(sc);
- enter_critical(zh);
- rc = add_completion_deregistration(zh, h.xid, COMPLETION_VOID,
- completion, data, 0, wdo, 0);
- rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
- get_buffer_len(oa));
- rc = rc < 0 ? ZMARSHALLINGERROR : ZOK;
- leave_critical(zh);
-
- /* We queued the buffer, so don't free it */
- close_buffer_oarchive(&oa, 0);
-
- LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",
- h.xid, path, zoo_get_current_server(zh));
-
- adaptor_send_queue(zh, 0);
-
-done:
- free_duplicate_path(server_path, path);
return rc;
}
+#endif
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/482ce218/src/c/tests/TestClient.cc
----------------------------------------------------------------------
diff --git a/src/c/tests/TestClient.cc b/src/c/tests/TestClient.cc
index ab6a133..92b1699 100644
--- a/src/c/tests/TestClient.cc
+++ b/src/c/tests/TestClient.cc
@@ -47,6 +47,10 @@ struct buff_struct_2 {
char *buffer;
};
+// TODO(br33d): the vast majority of this test is not usable with single threaded.
+// it needs a overhaul to work properly with both threaded and single
+// threaded (ZOOKEEPER-2640)
+#ifdef THREADED
// For testing LogMessage Callback functionality
list<string> logMessages;
void logMessageHandler(const char* message) {
@@ -1415,3 +1419,4 @@ volatile int Zookeeper_simpleSystem::count;
zhandle_t *Zookeeper_simpleSystem::async_zk;
const char Zookeeper_simpleSystem::hostPorts[] = "127.0.0.1:22181";
CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_simpleSystem);
+#endif
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/482ce218/src/c/tests/TestMulti.cc
----------------------------------------------------------------------
diff --git a/src/c/tests/TestMulti.cc b/src/c/tests/TestMulti.cc
index a54e047..0ee9566 100644
--- a/src/c/tests/TestMulti.cc
+++ b/src/c/tests/TestMulti.cc
@@ -160,12 +160,12 @@ public:
}
} watchctx_t;
+#ifdef THREADED
class Zookeeper_multi : public CPPUNIT_NS::TestFixture
{
CPPUNIT_TEST_SUITE(Zookeeper_multi);
//FIXME: None of these tests pass in single-threaded mode. It seems to be a
//flaw in the test suite setup.
-#ifdef THREADED
CPPUNIT_TEST(testCreate);
CPPUNIT_TEST(testCreateDelete);
CPPUNIT_TEST(testInvalidVersion);
@@ -178,7 +178,6 @@ class Zookeeper_multi : public CPPUNIT_NS::TestFixture
CPPUNIT_TEST(testCheck);
CPPUNIT_TEST(testWatch);
CPPUNIT_TEST(testSequentialNodeCreateInAsyncMulti);
-#endif
CPPUNIT_TEST_SUITE_END();
static void watcher(zhandle_t *, int type, int state, const char *path,void*v){
@@ -701,3 +700,4 @@ public:
volatile int Zookeeper_multi::count;
const char Zookeeper_multi::hostPorts[] = "127.0.0.1:22181";
CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_multi);
+#endif
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/482ce218/src/c/tests/TestReadOnlyClient.cc
----------------------------------------------------------------------
diff --git a/src/c/tests/TestReadOnlyClient.cc b/src/c/tests/TestReadOnlyClient.cc
index 37ab147..d73f189 100644
--- a/src/c/tests/TestReadOnlyClient.cc
+++ b/src/c/tests/TestReadOnlyClient.cc
@@ -27,11 +27,10 @@
#include "Util.h"
#include "WatchUtil.h"
+#ifdef THREADED
class Zookeeper_readOnly : public CPPUNIT_NS::TestFixture {
CPPUNIT_TEST_SUITE(Zookeeper_readOnly);
-#ifdef THREADED
CPPUNIT_TEST(testReadOnly);
-#endif
CPPUNIT_TEST_SUITE_END();
static void watcher(zhandle_t* zh, int type, int state,
@@ -108,3 +107,4 @@ public:
};
CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_readOnly);
+#endif
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/482ce218/src/c/tests/TestReconfigServer.cc
----------------------------------------------------------------------
diff --git a/src/c/tests/TestReconfigServer.cc b/src/c/tests/TestReconfigServer.cc
index b86b33d..2893a1b 100644
--- a/src/c/tests/TestReconfigServer.cc
+++ b/src/c/tests/TestReconfigServer.cc
@@ -25,6 +25,7 @@
#include "Util.h"
#include "ZooKeeperQuorumServer.h"
+#ifdef THREADED
class TestReconfigServer : public CPPUNIT_NS::TestFixture {
CPPUNIT_TEST_SUITE(TestReconfigServer);
#ifdef THREADED
@@ -33,7 +34,6 @@ class TestReconfigServer : public CPPUNIT_NS::TestFixture {
CPPUNIT_TEST(testRemoveFollower);
CPPUNIT_TEST(testReconfigFailureWithoutAuth);
CPPUNIT_TEST(testReconfigFailureWithoutServerSuperuserPasswordConfigured);
-#endif
CPPUNIT_TEST_SUITE_END();
public:
@@ -418,3 +418,4 @@ testReconfigFailureWithoutServerSuperuserPasswordConfigured() {
}
CPPUNIT_TEST_SUITE_REGISTRATION(TestReconfigServer);
+#endif