You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ak...@apache.org on 2008/07/24 23:46:31 UTC
svn commit: r679557 [2/3] - in /hadoop/zookeeper/trunk/src/c: ./ include/
src/ src/hashtable/ tests/
Modified: hadoop/zookeeper/trunk/src/c/src/zookeeper.c
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/src/zookeeper.c?rev=679557&r1=679556&r2=679557&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/c/src/zookeeper.c (original)
+++ hadoop/zookeeper/trunk/src/c/src/zookeeper.c Thu Jul 24 14:46:30 2008
@@ -29,6 +29,8 @@
#include <proto.h>
#include "zk_adaptor.h"
#include "zk_log.h"
+#include "zk_hashtable.h"
+
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
@@ -55,50 +57,50 @@
const int EPHEMERAL = 1 << 0;
const int SEQUENCE = 1 << 1;
-const int EXPIRED_SESSION_STATE = -112;
-const int AUTH_FAILED_STATE = -113;
-const int CONNECTING_STATE = 1;
-const int ASSOCIATING_STATE = 2;
-const int CONNECTED_STATE = 3;
+const int EXPIRED_SESSION_STATE = EXPIRED_SESSION_STATE_DEF;
+const int AUTH_FAILED_STATE = AUTH_FAILED_STATE_DEF;
+const int CONNECTING_STATE = CONNECTING_STATE_DEF;
+const int ASSOCIATING_STATE = ASSOCIATING_STATE_DEF;
+const int CONNECTED_STATE = CONNECTED_STATE_DEF;
static __attribute__ ((unused)) const char* state2String(int state){
switch(state){
case 0:
return "CLOSED_STATE";
- case 1 /*CONNECTING_STATE*/:
+ case CONNECTING_STATE_DEF:
return "CONNECTING_STATE";
- case 2 /*ASSOCIATING_STATE*/:
+ case ASSOCIATING_STATE_DEF:
return "ASSOCIATING_STATE";
- case 3 /*CONNECTED_STATE*/:
+ case CONNECTED_STATE_DEF:
return "CONNECTED_STATE";
- case -112 /*EXPIRED_SESSION_STATE*/:
+ case EXPIRED_SESSION_STATE_DEF:
return "EXPIRED_SESSION_STATE";
- case -113 /*AUTH_FAILED_STATE*/:
+ case AUTH_FAILED_STATE_DEF:
return "AUTH_FAILED_STATE";
}
return "INVALID_STATE";
}
-const int CREATED_EVENT = 1;
-const int DELETED_EVENT = 2;
-const int CHANGED_EVENT = 3;
-const int CHILD_EVENT = 4;
-const int SESSION_EVENT = -1;
-const int NOTWATCHING_EVENT = -2;
+const int CREATED_EVENT = CREATED_EVENT_DEF;
+const int DELETED_EVENT = DELETED_EVENT_DEF;
+const int CHANGED_EVENT = CHANGED_EVENT_DEF;
+const int CHILD_EVENT = CHILD_EVENT_DEF;
+const int SESSION_EVENT = SESSION_EVENT_DEF;
+const int NOTWATCHING_EVENT = NOTWATCHING_EVENT_DEF;
static __attribute__ ((unused)) const char* watcherEvent2String(int ev){
switch(ev){
case 0:
return "ERROR_EVENT";
- case 1 /*CREATED_EVENT*/:
+ case CREATED_EVENT_DEF:
return "CREATED_EVENT";
- case 2 /*DELETED_EVENT*/:
+ case DELETED_EVENT_DEF:
return "DELETED_EVENT";
- case 3 /*CHANGED_EVENT*/:
+ case CHANGED_EVENT_DEF:
return "CHANGED_EVENT";
- case 4 /*CHILD_EVENT*/:
+ case CHILD_EVENT_DEF:
return "CHILD_EVENT";
- case -1 /*SESSION_EVENT*/:
+ case SESSION_EVENT_DEF:
return "SESSION_EVENT";
- case -2 /*NOTWATCHING_EVENT*/:
+ case NOTWATCHING_EVENT_DEF:
return "NOTWATCHING_EVENT";
}
return "INVALID_EVENT";
@@ -126,19 +128,6 @@
#define COMPLETION_ACLLIST 4
#define COMPLETION_STRING 5
-const char*err2string(int err);
-static const char* format_endpoint_info(const struct sockaddr* ep);
-static const char* format_current_endpoint_info(zhandle_t* zh);
-static int add_completion(zhandle_t *zh, int xid, int completion_type,
- const void *dc, const void *data, int add_to_front);
-static int handle_socket_error_msg(zhandle_t *zh, int line, int rc,
- const char* format,...);
-static void cleanup_bufs(zhandle_t *zh,int callCompletion,int rc);
-
-static int disable_conn_permute=0; // permute enabled by default
-
-static void *SYNCHRONOUS_MARKER = (void*)&SYNCHRONOUS_MARKER;
-
typedef struct _completion_list {
int xid;
int completion_type; /* one of the COMPLETION_* values */
@@ -153,8 +142,30 @@
const void *data;
buffer_list_t *buffer;
struct _completion_list *next;
+ watcher_registration_t* watcher;
} completion_list_t;
+const char*err2string(int err);
+static const char* format_endpoint_info(const struct sockaddr* ep);
+static const char* format_current_endpoint_info(zhandle_t* zh);
+
+/* completion routine forward declarations */
+static int add_completion(zhandle_t *zh, int xid, int completion_type,
+ const void *dc, const void *data, int add_to_front,watcher_registration_t* wo);
+static completion_list_t* create_completion_entry(int xid, int completion_type,
+ const void *dc, const void *data,watcher_registration_t* wo);
+static void destroy_completion_entry(completion_list_t* c);
+static void queue_completion(completion_head_t *list, completion_list_t *c,
+ int add_to_front);
+
+static int handle_socket_error_msg(zhandle_t *zh, int line, int rc,
+ const char* format,...);
+static void cleanup_bufs(zhandle_t *zh,int callCompletion,int rc);
+
+static int disable_conn_permute=0; // permute enabled by default
+
+static void *SYNCHRONOUS_MARKER = (void*)&SYNCHRONOUS_MARKER;
+
const void *zoo_get_context(zhandle_t *zh)
{
return zh->context;
@@ -194,6 +205,16 @@
{
return (zh->state<0)? ZINVALIDSTATE: ZOK;
}
+
+int exists_result_checker(int rc)
+{
+ return rc==ZOK ||rc == ZNONODE;
+}
+
+int default_result_checker(int rc)
+{
+ return rc==ZOK;
+}
/**
* Frees and closes everything associated with a handle,
* including the handle itself.
@@ -219,6 +240,8 @@
zh->addrs = NULL;
}
free_auth_info(&zh->auth);
+ destroy_zk_hashtable(zh->active_node_watchers);
+ destroy_zk_hashtable(zh->active_child_watchers);
}
static void setup_random()
@@ -359,7 +382,7 @@
return &zh->client_id;
}
-static void null_watcher_fn(zhandle_t* p1, int p2, int p3,const char* p4){}
+static void null_watcher_fn(zhandle_t* p1, int p2, int p3,const char* p4,void*p5){}
watcher_fn zoo_set_watcher(zhandle_t *zh,watcher_fn newFn)
{
@@ -412,9 +435,13 @@
zh->last_zxid = 0;
zh->next_deadline.tv_sec=zh->next_deadline.tv_usec=0;
zh->socket_readable.tv_sec=zh->socket_readable.tv_usec=0;
+ zh->active_node_watchers=create_zk_hashtable();
+ zh->active_child_watchers=create_zk_hashtable();
+
if (adaptor_init(zh) == -1) {
goto abort;
}
+
return zh;
abort:
errnosave=errno;
@@ -675,7 +702,7 @@
break;
}
}
- free(cptr);
+ destroy_completion_entry(cptr);
}
}
@@ -1116,11 +1143,6 @@
fprintf(LOGSTREAM,"end\n");
}
-static completion_list_t* create_completion_entry(int xid, int completion_type,
- const void *dc, const void *data);
-static void queue_completion(completion_head_t *list, completion_list_t *c,
- int add_to_front);
-
#ifdef THREADED
// IO thread queues session events to be processed by the completion thread
int queue_session_event(zhandle_t *zh, int state)
@@ -1141,12 +1163,7 @@
close_buffer_oarchive(&oa, 1);
goto error;
}
- if ((cptr=calloc(1,sizeof(*cptr)))==NULL) {
- LOG_ERROR(("out of memory"));
- close_buffer_oarchive(&oa, 1);
- goto error;
- }
- cptr->xid = WATCHER_EVENT_XID;
+ cptr = create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0);
cptr->buffer = allocate_buffer(get_buffer(oa), get_buffer_len(oa));
cptr->buffer->curr_offset = get_buffer_len(oa);
if (!cptr->buffer) {
@@ -1180,6 +1197,8 @@
return cptr;
}
+
+/* handles async completion (both single- and multithreaded) */
void process_completions(zhandle_t *zh)
{
completion_list_t *cptr;
@@ -1201,7 +1220,7 @@
/* This is a notification so there aren't any pending requests */
LOG_DEBUG(("Calling a watcher for node [%s], event=%s",
(evt.path==NULL?"NULL":evt.path),watcherEvent2String(type)));
- zh->watcher(zh, type, state, evt.path);
+ deliverWatchers(zh,type,state,evt.path);
deallocate_WatcherEvent(&evt);
} else {
int rc = hdr.err;
@@ -1271,9 +1290,9 @@
}
break;
}
- free_buffer(cptr->buffer);
- free(cptr);
+ activateWatcher(cptr->watcher,rc);
}
+ destroy_completion_entry(cptr);
close_buffer_iarchive(&ia);
}
}
@@ -1331,7 +1350,7 @@
zh->last_zxid = hdr.zxid;
if (hdr.xid == WATCHER_EVENT_XID) {
- completion_list_t *c = create_completion_entry(WATCHER_EVENT_XID,-1,0,0);
+ completion_list_t *c = create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0);
c->buffer = bptr;
queue_completion(&zh->completions_to_process, c, 0);
} else if(hdr.xid == AUTH_XID){
@@ -1378,10 +1397,10 @@
sc->rc = rc;
switch(cptr->completion_type) {
case COMPLETION_DATA:
+ LOG_DEBUG(("Calling COMPLETION_DATA for xid=%x rc=%d",cptr->xid,rc));
if (rc==0) {
struct GetDataResponse res;
int len;
- LOG_DEBUG(("Calling COMPLETION_DATA for xid=%x rc=%d",cptr->xid,rc));
deserialize_GetDataResponse(ia, "reply", &res);
if (res.data.len <= sc->u.data.buff_len) {
len = res.data.len;
@@ -1395,18 +1414,18 @@
}
break;
case COMPLETION_STAT:
+ LOG_DEBUG(("Calling COMPLETION_STAT for xid=%x rc=%d",cptr->xid,rc));
if (rc == 0) {
struct SetDataResponse res;
- LOG_DEBUG(("Calling COMPLETION_STAT for xid=%x rc=%d",cptr->xid,rc));
deserialize_SetDataResponse(ia, "reply", &res);
sc->u.stat = res.stat;
deallocate_SetDataResponse(&res);
}
break;
case COMPLETION_STRINGLIST:
+ LOG_DEBUG(("Calling COMPLETION_STRINGLIST for xid=%x rc=%d",cptr->xid,rc));
if (rc == 0) {
struct GetChildrenResponse res;
- LOG_DEBUG(("Calling COMPLETION_STRINGLIST for xid=%x rc=%d",cptr->xid,rc));
deserialize_GetChildrenResponse(ia, "reply", &res);
sc->u.strs = res.children;
/* We don't deallocate since we are passing it back */
@@ -1414,10 +1433,10 @@
}
break;
case COMPLETION_STRING:
+ LOG_DEBUG(("Calling COMPLETION_STRING for xid=%x rc=%d",cptr->xid,rc));
if (rc == 0) {
struct CreateResponse res;
int len;
- LOG_DEBUG(("Calling COMPLETION_STRING for xid=%x rc=%d",cptr->xid,rc));
deserialize_CreateResponse(ia, "reply", &res);
if (sc->u.str.str_len > strlen(res.path)) {
len = strlen(res.path);
@@ -1430,9 +1449,9 @@
}
break;
case COMPLETION_ACLLIST:
+ LOG_DEBUG(("Calling COMPLETION_ACLLIST for xid=%x rc=%d",cptr->xid,rc));
if (rc == 0) {
struct GetACLResponse res;
- LOG_DEBUG(("Calling COMPLETION_ACLLIST for xid=%x rc=%d",cptr->xid,rc));
deserialize_GetACLResponse(ia, "reply", &res);
cptr->c.acl_result(rc, &res.acl, &res.stat, cptr->data);
sc->u.acl.acl = res.acl;
@@ -1445,6 +1464,7 @@
LOG_DEBUG(("Calling COMPLETION_VOID for xid=%x rc=%d",cptr->xid,rc));
break;
}
+ activateWatcher(cptr->watcher,rc);
notify_sync_completion(sc);
free_buffer(bptr);
zh->outstanding_sync--;
@@ -1468,8 +1488,30 @@
return 0;
}
+static watcher_registration_t* create_watcher_registration(const char* path,
+ result_checker_fn checker,watcher_fn watcher,void* ctx,
+ zk_hashtable* activeMap){
+ watcher_registration_t* wo;
+ if(watcher==0)
+ return 0;
+ wo=calloc(1,sizeof(watcher_registration_t));
+ wo->path=strdup(path);
+ wo->watcher=watcher;
+ wo->context=ctx;
+ wo->checker=checker==0?default_result_checker:checker;
+ wo->activeMap=activeMap;
+ return wo;
+}
+
+static void destroy_watcher_registration(watcher_registration_t* wo){
+ if(wo!=0){
+ free((void*)wo->path);
+ free(wo);
+ }
+}
+
static completion_list_t* create_completion_entry(int xid, int completion_type,
- const void *dc, const void *data)
+ const void *dc, const void *data,watcher_registration_t* wo)
{
completion_list_t *c = calloc(1,sizeof(completion_list_t));
if (!c) {
@@ -1499,15 +1541,24 @@
break;
}
c->xid = xid;
- c->next = 0;
+ c->watcher = wo;
return c;
}
+static void destroy_completion_entry(completion_list_t* c){
+ if(c!=0){
+ if(c->buffer!=0)
+ free_buffer(c->buffer);
+ destroy_watcher_registration(c->watcher);
+ free(c);
+ }
+}
+
static void queue_completion(completion_head_t *list, completion_list_t *c,
int add_to_front)
{
- c->next = 0;
+ c->next = 0;
/* appending a new entry to the back of the list */
lock_completion_list(list);
if (list->last) {
@@ -1530,10 +1581,11 @@
}
static int add_completion(zhandle_t *zh, int xid, int completion_type,
- const void *dc, const void *data, int add_to_front)
+ const void *dc, const void *data, int add_to_front,
+ watcher_registration_t* wo)
{
completion_list_t *c =create_completion_entry(xid, completion_type, dc,
- data);
+ data,wo);
if (!c)
return ZSYSTEMERROR;
queue_completion(&zh->sent_requests, c, add_to_front);
@@ -1544,39 +1596,39 @@
}
static int add_data_completion(zhandle_t *zh, int xid, data_completion_t dc,
- const void *data)
+ const void *data,watcher_registration_t* wo)
{
- return add_completion(zh, xid, COMPLETION_DATA, dc, data, 0);
+ return add_completion(zh, xid, COMPLETION_DATA, dc, data, 0,wo);
}
static int add_stat_completion(zhandle_t *zh, int xid, stat_completion_t dc,
- const void *data)
+ const void *data,watcher_registration_t* wo)
{
- return add_completion(zh, xid, COMPLETION_STAT, dc, data, 0);
+ return add_completion(zh, xid, COMPLETION_STAT, dc, data, 0,wo);
}
static int add_strings_completion(zhandle_t *zh, int xid,
- strings_completion_t dc, const void *data)
+ strings_completion_t dc, const void *data,watcher_registration_t* wo)
{
- return add_completion(zh, xid, COMPLETION_STRINGLIST, dc, data, 0);
+ return add_completion(zh, xid, COMPLETION_STRINGLIST, dc, data, 0,wo);
}
static int add_acl_completion(zhandle_t *zh, int xid, acl_completion_t dc,
const void *data)
{
- return add_completion(zh, xid, COMPLETION_ACLLIST, dc, data, 0);
+ return add_completion(zh, xid, COMPLETION_ACLLIST, dc, data, 0,0);
}
static int add_void_completion(zhandle_t *zh, int xid, void_completion_t dc,
const void *data)
{
- return add_completion(zh, xid, COMPLETION_VOID, dc, data, 0);
+ return add_completion(zh, xid, COMPLETION_VOID, dc, data, 0,0);
}
static int add_string_completion(zhandle_t *zh, int xid,
string_completion_t dc, const void *data)
{
- return add_completion(zh, xid, COMPLETION_STRING, dc, data, 0);
+ return add_completion(zh, xid, COMPLETION_STRING, dc, data, 0,0);
}
int zookeeper_close(zhandle_t *zh)
@@ -1625,9 +1677,16 @@
int zoo_aget(zhandle_t *zh, const char *path, int watch, data_completion_t dc,
const void *data)
{
+ return zoo_awget(zh,path,watch?zh->watcher:0,zh->context,dc,data);
+}
+
+int zoo_awget(zhandle_t *zh, const char *path,
+ watcher_fn watcher, void* watcherCtx,
+ data_completion_t dc, const void *data)
+{
struct oarchive *oa;
struct RequestHeader h = { .xid = get_xid(), .type = GETDATA_OP};
- struct GetDataRequest req = { (char*)path, watch };
+ struct GetDataRequest req = { (char*)path, watcher!=0 };
int rc;
if (zh==0 || path==0)
@@ -1638,7 +1697,9 @@
rc = serialize_RequestHeader(oa, "header", &h);
rc = rc < 0 ? rc : serialize_GetDataRequest(oa, "req", &req);
enter_critical(zh);
- rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data);
+ rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data,
+ create_watcher_registration(path,0,watcher,watcherCtx,
+ zh->active_node_watchers));
rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
get_buffer_len(oa));
leave_critical(zh);
@@ -1673,7 +1734,7 @@
rc = serialize_RequestHeader(oa, "header", &h);
rc = rc < 0 ? rc : serialize_SetDataRequest(oa, "req", &req);
enter_critical(zh);
- rc = rc < 0 ? rc : add_stat_completion(zh, h.xid, dc, data);
+ rc = rc < 0 ? rc : add_stat_completion(zh, h.xid, dc, data,0);
rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
get_buffer_len(oa));
leave_critical(zh);
@@ -1761,11 +1822,18 @@
}
int zoo_aexists(zhandle_t *zh, const char *path, int watch,
+ stat_completion_t sc, const void *data)
+{
+ return zoo_awexists(zh,path,watch?zh->watcher:0,zh->context,sc,data);
+}
+
+int zoo_awexists(zhandle_t *zh, const char *path,
+ watcher_fn watcher, void* watcherCtx,
stat_completion_t completion, const void *data)
{
struct oarchive *oa;
struct RequestHeader h = { .xid = get_xid(), .type = EXISTS_OP };
- struct ExistsRequest req;
+ struct ExistsRequest req = {(char*)path, watcher!=0 };
int rc;
if (zh==0 || path==0)
@@ -1773,12 +1841,12 @@
if (is_unrecoverable(zh))
return ZINVALIDSTATE;
oa = create_buffer_oarchive();
- req.path = (char*)path;
- req.watch = watch;
rc = serialize_RequestHeader(oa, "header", &h);
rc = rc < 0 ? rc : serialize_ExistsRequest(oa, "req", &req);
enter_critical(zh);
- rc = rc < 0 ? rc : add_stat_completion(zh, h.xid, completion, data);
+ rc = rc < 0 ? rc : add_stat_completion(zh, h.xid, completion, data,
+ create_watcher_registration(path,exists_result_checker,
+ watcher,watcherCtx,zh->active_node_watchers));
rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
get_buffer_len(oa));
leave_critical(zh);
@@ -1793,11 +1861,18 @@
}
int zoo_aget_children(zhandle_t *zh, const char *path, int watch,
- strings_completion_t completion, const void *data)
+ strings_completion_t dc, const void *data)
+{
+ return zoo_awget_children(zh,path,watch?zh->watcher:0,zh->context,dc,data);
+}
+
+int zoo_awget_children(zhandle_t *zh, const char *path,
+ watcher_fn watcher, void* watcherCtx,
+ strings_completion_t dc, const void *data)
{
struct oarchive *oa;
struct RequestHeader h = { .xid = get_xid(), .type = GETCHILDREN_OP};
- struct GetChildrenRequest req;
+ struct GetChildrenRequest req={(char*)path, watcher!=0 };
int rc;
if (zh==0 || path==0)
@@ -1805,12 +1880,12 @@
if (is_unrecoverable(zh))
return ZINVALIDSTATE;
oa = create_buffer_oarchive();
- req.path = (char*)path;
- req.watch = watch;
rc = serialize_RequestHeader(oa, "header", &h);
rc = rc < 0 ? rc : serialize_GetChildrenRequest(oa, "req", &req);
enter_critical(zh);
- rc = rc < 0 ? rc : add_strings_completion(zh, h.xid, completion, data);
+ rc = rc < 0 ? rc : add_strings_completion(zh, h.xid, dc, data,
+ create_watcher_registration(path,0,watcher,watcherCtx,
+ zh->active_child_watchers));
rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
get_buffer_len(oa));
leave_critical(zh);
@@ -2131,12 +2206,18 @@
int zoo_exists(zhandle_t *zh, const char *path, int watch, struct Stat *stat)
{
+ return zoo_wexists(zh,path,watch?zh->watcher:0,zh->context,stat);
+}
+
+int zoo_wexists(zhandle_t *zh, const char *path,
+ watcher_fn watcher, void* watcherCtx, struct Stat *stat)
+{
struct sync_completion *sc = alloc_sync_completion();
int rc;
if (!sc) {
return ZSYSTEMERROR;
}
- rc=zoo_aexists(zh, path, watch, SYNCHRONOUS_MARKER, sc);
+ rc=zoo_awexists(zh,path,watcher,watcherCtx,SYNCHRONOUS_MARKER, sc);
if(rc==ZOK){
wait_sync_completion(sc);
rc = sc->rc;
@@ -2145,12 +2226,20 @@
}
}
free_sync_completion(sc);
- return rc;
+ return rc;
}
int zoo_get(zhandle_t *zh, const char *path, int watch, char *buffer,
int* buffer_len, struct Stat *stat)
{
+ return zoo_wget(zh,path,watch?zh->watcher:0,zh->context,
+ buffer,buffer_len,stat);
+}
+
+int zoo_wget(zhandle_t *zh, const char *path,
+ watcher_fn watcher, void* watcherCtx,
+ char *buffer, int* buffer_len, struct Stat *stat)
+{
struct sync_completion *sc;
int rc=0;
@@ -2161,7 +2250,7 @@
sc->u.data.buffer = buffer;
sc->u.data.buff_len = *buffer_len;
- rc=zoo_aget(zh, path, watch, SYNCHRONOUS_MARKER, sc);
+ rc=zoo_awget(zh, path, watcher, watcherCtx, SYNCHRONOUS_MARKER, sc);
if(rc==ZOK){
wait_sync_completion(sc);
rc = sc->rc;
@@ -2195,12 +2284,19 @@
int zoo_get_children(zhandle_t *zh, const char *path, int watch,
struct String_vector *strings)
{
+ return zoo_wget_children(zh,path,watch?zh->watcher:0,zh->context,strings);
+}
+
+int zoo_wget_children(zhandle_t *zh, const char *path,
+ watcher_fn watcher, void* watcherCtx,
+ struct String_vector *strings)
+{
struct sync_completion *sc = alloc_sync_completion();
int rc;
if (!sc) {
return ZSYSTEMERROR;
}
- rc=zoo_aget_children(zh, path, watch, SYNCHRONOUS_MARKER, sc);
+ rc=zoo_awget_children(zh, path, watcher, watcherCtx, SYNCHRONOUS_MARKER, sc);
if(rc==ZOK){
wait_sync_completion(sc);
rc = sc->rc;
Added: hadoop/zookeeper/trunk/src/c/tests/CollectionUtil.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/CollectionUtil.h?rev=679557&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/c/tests/CollectionUtil.h (added)
+++ hadoop/zookeeper/trunk/src/c/tests/CollectionUtil.h Thu Jul 24 14:46:30 2008
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _COLLECTION_UTIL_H_
+#define _COLLECTION_UTIL_H_
+
+/**
+ * \file
+ * CollectionBuilder and DictionaryBuilder classes and collection utility functions
+ */
+
+namespace Util
+{
+
+// *********************************************************
+/** A shortcut to use for building collections.
+ * This class is a wrapper around standard STL collection containers such as vector.
+ * It allows one to conveniently build collections at the variable initialization time:
+ * \code
+ * #include "CollectionUtil.h"
+ * #include "Vector.h" // for ostream << operator overload for STL vector
+ * using Util;
+ *
+ * int main()
+ * {
+ * typedef vector<string> MyVector;
+ * MyVector myVector=CollectionBuilder<MyVector>()("str1")("str2")("str3");
+ * cout<<myVector;
+ * // the following output will be produced:
+ * // [str1,str2,str3]
+ * }
+ * \endcode
+ */
+template <class CONT>
+class CollectionBuilder
+{
+public:
+ /// Type of the collection container.
+ typedef CONT CollectionType;
+ /// Container's value type.
+ typedef typename CollectionType::value_type value_type;
+ /// Container's constant iterator type.
+ typedef typename CollectionType::const_iterator const_iterator;
+ /// Container's size type.
+ typedef typename CollectionType::size_type size_type;
+
+ /** Operator function call overload to allow call chaining.
+ * \param value the value to be inserted into the container
+ */
+ CollectionBuilder<CONT>& operator()(const value_type& value){
+ return push_back(value);
+ }
+ /** Same as regular STL push_back() but allows call chaining.
+ * \param value the value to be inserted into the container
+ */
+ CollectionBuilder<CONT>& push_back(const value_type& value){
+ collection_.push_back(value);
+ return *this;
+ }
+ /// \name Standard STL container interface
+ /// @{
+ const_iterator begin() const{return collection_.begin();}
+ const_iterator end() const{return collection_.end();}
+ size_type size() const{return collection_.size();}
+ void clear() {collection_.clear();}
+ ///@}
+ /// Explicit typecast operator.
+ operator const CollectionType&() const {return collection_;}
+private:
+ /// \cond PRIVATE
+ CollectionType collection_;
+ /// \endcond
+};
+
+
+// *********************************************************
+/** A shortcut to use for building dictionaries.
+ * This class is a wrapper around standard STL associative containers such as map.
+ * It allows one to conveniently build dictionaries at the variable initialization time:
+ * \code
+ * #include "CollectionUtil.h"
+ * #include "Map.h" // for ostream << operator overload for STL map
+ * using Util;
+ *
+ * int main()
+ * {
+ * typedef map<string,int> MyMap;
+ * MyMap myMap=DictionaryBuilder<MyMap>()("str1",1)("str2",2)("str3",3);
+ * cout<<myMap;
+ * // the following output will be produced:
+ * // [str1=1,str2=2,str3=3]
+ * }
+ * \endcode
+ */
+template <class CONT>
+class DictionaryBuilder
+{
+public:
+ /// The type of the associative container
+ typedef CONT DictionaryType;
+ /// Container's element type (usually a pair<key_type,mapped_type>)
+ typedef typename DictionaryType::value_type value_type;
+ /// Container's key type
+ typedef typename DictionaryType::key_type key_type;
+ /// Container's value type
+ typedef typename DictionaryType::mapped_type mapped_type;
+ /// Container's constant iterator type
+ typedef typename DictionaryType::const_iterator const_iterator;
+ /// Container's writable iterator type
+ typedef typename DictionaryType::iterator iterator;
+ /// Container's size type
+ typedef typename DictionaryType::size_type size_type;
+
+ /** Operator function call overload to allow call chaining.
+ * \param key the value key to be inserted
+ * \param value the value to be inserted into the container
+ * \return a non-const reference to self
+ */
+ DictionaryBuilder<CONT>& operator()(const key_type& key,const mapped_type& value){
+ dict_.insert(value_type(key,value));
+ return *this;
+ }
+ /** Lookup value by key.
+ * \param key the key associated with the value.
+ * \return a non-const iterator pointing to the element whose key matched the \a key parameter
+ */
+ iterator find(const key_type& key){
+ return dict_.find(key);
+ }
+ /** Lookup value by key.
+ * \param key the key associated with the value.
+ * \return a const iterator pointing to the element whose key matched the \a key parameter
+ */
+ const_iterator find(const key_type& key) const{
+ return dict_.find(key);
+ }
+
+ /// \name Standard STL container interface
+ /// @{
+ const_iterator begin() const{return dict_.begin();}
+ const_iterator end() const{return dict_.end();}
+ size_type size() const{return dict_.size();}
+ void clear() {dict_.clear();}
+ ///@}
+ /// Explicit typecast operator.
+ operator const DictionaryType&() const {return dict_;}
+private:
+ DictionaryType dict_;
+};
+
+
+// ***********************************************************
+/** Deletes all dynamically allocated elements of a collection.
+ * C::value_type is expected to be a pointer to a dynamically allocated object, or it won't compile.
+ * The function will iterate over all container elements and call delete for each of them.
+ * \param c a collection (vector,set) whose elements are being deleted.
+ */
+template <class C>
+void clearCollection(C& c){
+ for(typename C::const_iterator it=c.begin();it!=c.end();++it)
+ delete *it;
+ c.clear();
+}
+
+/** Deletes all dynamically allocated values of the assotiative container.
+ * The function expects the M::value_type to be a pair<..., ptr_to_type>, or it won't compile.
+ * It first deletes the objects pointed to by ptr_to_type
+ * and then clears (calls m.clear()) the container.
+ * \param m an associative container (map,hash_map) whose elements are being deleted.
+ */
+template <class M>
+void clearMap(M& m){
+ for(typename M::const_iterator it=m.begin();it!=m.end();++it)
+ delete it->second;
+ m.clear();
+}
+
+} // namespace Util
+
+
+#endif // _COLLECTION_UTIL_H_
Propchange: hadoop/zookeeper/trunk/src/c/tests/CollectionUtil.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/zookeeper/trunk/src/c/tests/TestHashtable.cc
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/TestHashtable.cc?rev=679557&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/c/tests/TestHashtable.cc (added)
+++ hadoop/zookeeper/trunk/src/c/tests/TestHashtable.cc Thu Jul 24 14:46:30 2008
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cppunit/extensions/HelperMacros.h>
+#include "CppAssertHelper.h"
+
+
+#include "CollectionUtil.h"
+using namespace Util;
+
+#include "Vector.h"
+using namespace std;
+
+#include "src/zk_hashtable.h"
+
+class Zookeeper_hashtable : public CPPUNIT_NS::TestFixture
+{
+ CPPUNIT_TEST_SUITE(Zookeeper_hashtable);
+ CPPUNIT_TEST(testInsertElement1);
+ CPPUNIT_TEST(testInsertElement2);
+ CPPUNIT_TEST(testInsertElement3);
+ CPPUNIT_TEST(testContainsWatcher1);
+ CPPUNIT_TEST(testContainsWatcher2);
+ CPPUNIT_TEST(testCombineHashtable1);
+ CPPUNIT_TEST(testMoveMergeWatchers1);
+ CPPUNIT_TEST(testDeliverSessionEvent1);
+ CPPUNIT_TEST(testDeliverZnodeEvent1);
+ CPPUNIT_TEST_SUITE_END();
+
+ static void watcher(zhandle_t *, int, int, const char *,void*){}
+ zk_hashtable *ht;
+
+public:
+
+ void setUp()
+ {
+ ht=create_zk_hashtable();
+ }
+
+ void tearDown()
+ {
+ destroy_zk_hashtable(ht);
+ }
+
+ static vector<int> getWatcherCtxAsVector(zk_hashtable* ht,const char* path){
+ watcher_object_t* wo=getFirstWatcher(ht,path);
+ vector<int> res;
+ while(wo!=0){
+ res.push_back((int)wo->context);
+ wo=wo->next;
+ }
+ return res;
+ }
+
+ // insert 2 watchers for different paths
+ // verify that hashtable size is 2
+ void testInsertElement1()
+ {
+ CPPUNIT_ASSERT_EQUAL(0,get_element_count(ht));
+ int res=insert_watcher_object(ht,"path1",
+ create_watcher_object(watcher,(void*)1));
+ CPPUNIT_ASSERT_EQUAL(1,res);
+ res=insert_watcher_object(ht,"path2",
+ create_watcher_object(watcher,(void*)1));
+ CPPUNIT_ASSERT_EQUAL(1,res);
+ CPPUNIT_ASSERT_EQUAL(2,get_element_count(ht));
+ vector<int> expWatchers=CollectionBuilder<vector<int> >().push_back(1);
+ CPPUNIT_ASSERT_EQUAL(expWatchers,getWatcherCtxAsVector(ht,"path1"));
+ CPPUNIT_ASSERT_EQUAL(expWatchers,getWatcherCtxAsVector(ht,"path2"));
+ clean_zk_hashtable(ht);
+ CPPUNIT_ASSERT_EQUAL(0,get_element_count(ht));
+ }
+
+ // insert 2 different watchers for the same path;
+ // verify: hashtable element count is 1, and the watcher count for the path
+ // is 2
+ void testInsertElement2()
+ {
+ int res=insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)1));
+ CPPUNIT_ASSERT_EQUAL(1,res);
+ res=insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)2));
+ CPPUNIT_ASSERT_EQUAL(1,res);
+ CPPUNIT_ASSERT_EQUAL(1,get_element_count(ht));
+ CPPUNIT_ASSERT_EQUAL(2,get_watcher_count(ht,"path1"));
+ vector<int> expWatchers=CollectionBuilder<vector<int> >().
+ push_back(2).push_back(1);
+ CPPUNIT_ASSERT_EQUAL(expWatchers,getWatcherCtxAsVector(ht,"path1"));
+ }
+
+ // insert 2 identical watchers for the same path;
+ // verify: hashtable element count is 1, the watcher count for the path is 1
+ void testInsertElement3()
+ {
+ watcher_object_t wobject;
+ wobject.watcher=watcher;
+ wobject.context=(void*)1;
+
+ int res=insert_watcher_object(ht,"path1",clone_watcher_object(&wobject));
+ CPPUNIT_ASSERT_EQUAL(1,res);
+ watcher_object_t* wo=clone_watcher_object(&wobject);
+ res=insert_watcher_object(ht,"path1",wo);
+ CPPUNIT_ASSERT_EQUAL(0,res);
+ CPPUNIT_ASSERT_EQUAL(1,get_element_count(ht));
+ CPPUNIT_ASSERT_EQUAL(1,get_watcher_count(ht,"path1"));
+ vector<int> expWatchers=CollectionBuilder<vector<int> >().push_back(1);
+ CPPUNIT_ASSERT_EQUAL(expWatchers,getWatcherCtxAsVector(ht,"path1"));
+ // must delete the object that wasn't inserted!
+ free(wo);
+ }
+
+ // verify: the watcher is found in the table
+ void testContainsWatcher1()
+ {
+ watcher_object_t expected;
+ expected.watcher=watcher;
+ expected.context=(void*)1;
+
+ insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)2));
+ insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)3));
+ insert_watcher_object(ht,"path2",create_watcher_object(watcher,(void*)4));
+ insert_watcher_object(ht,"path2",clone_watcher_object(&expected));
+
+ CPPUNIT_ASSERT_EQUAL(2,get_element_count(ht));
+ CPPUNIT_ASSERT_EQUAL(2,get_watcher_count(ht,"path1"));
+ CPPUNIT_ASSERT_EQUAL(2,get_watcher_count(ht,"path2"));
+
+ int res=contains_watcher(ht,&expected);
+ CPPUNIT_ASSERT(res==1);
+ }
+
+ // verify: the watcher is not found
+ void testContainsWatcher2()
+ {
+ watcher_object_t expected;
+ expected.watcher=watcher;
+ expected.context=(void*)1;
+
+ insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)2));
+ insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)3));
+ insert_watcher_object(ht,"path2",create_watcher_object(watcher,(void*)4));
+ insert_watcher_object(ht,"path2",create_watcher_object(watcher,(void*)5));
+
+ CPPUNIT_ASSERT_EQUAL(2,get_element_count(ht));
+ CPPUNIT_ASSERT_EQUAL(2,get_watcher_count(ht,"path1"));
+ CPPUNIT_ASSERT_EQUAL(2,get_watcher_count(ht,"path2"));
+
+ int res=contains_watcher(ht,&expected);
+ CPPUNIT_ASSERT(res==0);
+ }
+
+ void testCombineHashtable1()
+ {
+ insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)2));
+ insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)3));
+ insert_watcher_object(ht,"path2",create_watcher_object(watcher,(void*)4));
+ insert_watcher_object(ht,"path2",create_watcher_object(watcher,(void*)5));
+
+ zk_hashtable* ht2=create_zk_hashtable();
+
+ insert_watcher_object(ht2,"path1",create_watcher_object(watcher,(void*)2));
+ insert_watcher_object(ht2,"path2",create_watcher_object(watcher,(void*)6));
+ insert_watcher_object(ht2,"path3",create_watcher_object(watcher,(void*)2));
+
+ zk_hashtable* res=combine_hashtables(ht,ht2);
+
+ CPPUNIT_ASSERT_EQUAL(3,get_element_count(res));
+ // path1 --> 2,3
+ CPPUNIT_ASSERT_EQUAL(2,get_watcher_count(res,"path1"));
+ vector<int> expWatchers1=CollectionBuilder<vector<int> >().
+ push_back(2).push_back(3);
+ CPPUNIT_ASSERT_EQUAL(expWatchers1,getWatcherCtxAsVector(res,"path1"));
+ // path2 --> 4,5,6
+ CPPUNIT_ASSERT_EQUAL(3,get_watcher_count(res,"path2"));
+ vector<int> expWatchers2=CollectionBuilder<vector<int> >().
+ push_back(6).push_back(4).push_back(5);
+ CPPUNIT_ASSERT_EQUAL(expWatchers2,getWatcherCtxAsVector(res,"path2"));
+ // path3 --> 2
+ CPPUNIT_ASSERT_EQUAL(1,get_watcher_count(res,"path3"));
+ vector<int> expWatchers3=CollectionBuilder<vector<int> >().push_back(2);
+ CPPUNIT_ASSERT_EQUAL(expWatchers3,getWatcherCtxAsVector(res,"path3"));
+
+ destroy_zk_hashtable(ht2);
+ destroy_zk_hashtable(res);
+ }
+
+ void testMoveMergeWatchers1()
+ {
+ insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)2));
+ insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)3));
+ insert_watcher_object(ht,"path2",create_watcher_object(watcher,(void*)4));
+ insert_watcher_object(ht,"path2",create_watcher_object(watcher,(void*)5));
+
+ zk_hashtable* ht2=create_zk_hashtable();
+
+ insert_watcher_object(ht2,"path1",create_watcher_object(watcher,(void*)2));
+ insert_watcher_object(ht2,"path1",create_watcher_object(watcher,(void*)6));
+
+ zk_hashtable* res=move_merge_watchers(ht,ht2,"path1");
+
+ CPPUNIT_ASSERT_EQUAL(1,get_element_count(res));
+ CPPUNIT_ASSERT_EQUAL(3,get_watcher_count(res,"path1"));
+ vector<int> expWatchers=CollectionBuilder<vector<int> >().
+ push_back(6).push_back(2).push_back(3);
+ CPPUNIT_ASSERT_EQUAL(expWatchers,getWatcherCtxAsVector(res,"path1"));
+
+ // make sure the path entry has been deleted from the source hashtables
+ CPPUNIT_ASSERT_EQUAL(1,get_element_count(ht));
+ CPPUNIT_ASSERT_EQUAL(2,get_watcher_count(ht,"path2"));
+ CPPUNIT_ASSERT_EQUAL(0,get_element_count(ht2));
+
+ destroy_zk_hashtable(ht2);
+ destroy_zk_hashtable(res);
+ }
+
+ static void iterWatcher(zhandle_t *zh, int type, int state,
+ const char* path,void* ctx){
+ vector<int>* res=reinterpret_cast<vector<int>*>(zh);
+ res->push_back((int)ctx);
+ }
+
+ void testDeliverSessionEvent1(){
+ insert_watcher_object(ht,"path1",create_watcher_object(iterWatcher,(void*)2));
+ insert_watcher_object(ht,"path2",create_watcher_object(iterWatcher,(void*)3));
+ insert_watcher_object(ht,"path2",create_watcher_object(iterWatcher,(void*)4));
+ insert_watcher_object(ht,"path3",create_watcher_object(iterWatcher,(void*)5));
+
+ vector<int> res;
+ deliver_session_event(ht,(zhandle_t*)&res,10,20);
+ vector<int> expWatchers=CollectionBuilder<vector<int> >().
+ push_back(4).push_back(3).push_back(5).push_back(2);
+ CPPUNIT_ASSERT_EQUAL(expWatchers,res);
+ }
+
+ void testDeliverZnodeEvent1(){
+ insert_watcher_object(ht,"path2",create_watcher_object(iterWatcher,(void*)3));
+ insert_watcher_object(ht,"path2",create_watcher_object(iterWatcher,(void*)4));
+
+ vector<int> res;
+ deliver_znode_event(ht,(zhandle_t*)&res,"path2",10,20);
+ vector<int> expWatchers=CollectionBuilder<vector<int> >().
+ push_back(4).push_back(3);
+ CPPUNIT_ASSERT_EQUAL(expWatchers,res);
+ expWatchers.clear();
+ res.clear();
+ // non-existent path
+ deliver_znode_event(ht,(zhandle_t*)&res,"path100",10,20);
+ CPPUNIT_ASSERT_EQUAL(expWatchers,res);
+ // make sure the path entry has been deleted from the source hashtable
+ CPPUNIT_ASSERT_EQUAL(0,get_element_count(ht));
+ }
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_hashtable);
Propchange: hadoop/zookeeper/trunk/src/c/tests/TestHashtable.cc
------------------------------------------------------------------------------
svn:eol-style = native
Modified: hadoop/zookeeper/trunk/src/c/tests/TestOperations.cc
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/TestOperations.cc?rev=679557&r1=679556&r2=679557&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/c/tests/TestOperations.cc (original)
+++ hadoop/zookeeper/trunk/src/c/tests/TestOperations.cc Thu Jul 24 14:46:30 2008
@@ -41,7 +41,7 @@
CPPUNIT_TEST_SUITE_END();
zhandle_t *zh;
- static void watcher(zhandle_t *, int, int, const char *){}
+ static void watcher(zhandle_t *, int, int, const char *,void*){}
public:
void setUp()
{
@@ -256,7 +256,7 @@
zkServer.addRecvResponse(new PingResponse);
rc=zookeeper_interest(zh,&fd,&interest,&tv);
CPPUNIT_ASSERT_EQUAL(ZOK,rc);
- // sleep for a short while (10 ms)
+ // pseudo-sleep for a short while (10 ms)
timeMock.millitick(10);
rc=zookeeper_process(zh,interest);
CPPUNIT_ASSERT_EQUAL(ZOK,rc);
@@ -575,12 +575,26 @@
changed_=true;
if(path!=0) path_=path;
}
+ // this predicate checks if CHANGE_EVENT event type was triggered, unlike
+ // the isWatcherTriggered() that returns true whenever a watcher is triggered
+ // regardless of the event type
SyncedBoolCondition isNodeChangedTriggered() const{
return SyncedBoolCondition(changed_,mx_);
}
bool changed_;
string path_;
};
+
+ class AsyncWatcherCompletion: public AsyncCompletion{
+ public:
+ AsyncWatcherCompletion(ZookeeperServer& zkServer):zkServer_(zkServer){}
+ virtual void statCompl(int rc, const Stat *stat){
+ // we received a server response, now enqueue a watcher event
+ // to trigger the watcher
+ zkServer_.addRecvResponse(new ZNodeEvent(CHANGED_EVENT,"/x/y/z"));
+ }
+ ZookeeperServer& zkServer_;
+ };
// verify that async watcher is called for znode events (CREATED, DELETED etc.)
void testAsyncWatcher1(){
Mock_gettimeofday timeMock;
@@ -596,9 +610,14 @@
CPPUNIT_ASSERT(zh!=0);
// make sure the client has connected
CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
-
- // trigger the watcher
- zkServer.addRecvResponse(new ZNodeEvent(CHANGED_EVENT,"/x/y/z"));
+
+ // set the watcher
+ AsyncWatcherCompletion completion(zkServer);
+ // prepare a response for the zoo_aexists() request
+ zkServer.addOperationResponse(new ZooStatResponse);
+ int rc=zoo_aexists(zh,"/x/y/z",1,asyncCompletion,&completion);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+
CPPUNIT_ASSERT(ensureCondition(action.isNodeChangedTriggered(),1000)<1000);
CPPUNIT_ASSERT_EQUAL(string("/x/y/z"),action.path_);
}
Added: hadoop/zookeeper/trunk/src/c/tests/TestWatchers.cc
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/TestWatchers.cc?rev=679557&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/c/tests/TestWatchers.cc (added)
+++ hadoop/zookeeper/trunk/src/c/tests/TestWatchers.cc Thu Jul 24 14:46:30 2008
@@ -0,0 +1,745 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cppunit/extensions/HelperMacros.h>
+#include "CppAssertHelper.h"
+
+#include "ZKMocks.h"
+#include "CollectionUtil.h"
+
+class Zookeeper_watchers : public CPPUNIT_NS::TestFixture
+{
+ CPPUNIT_TEST_SUITE(Zookeeper_watchers);
+ CPPUNIT_TEST(testDefaultSessionWatcher1);
+ CPPUNIT_TEST(testDefaultSessionWatcher2);
+ CPPUNIT_TEST(testObjectSessionWatcher1);
+ CPPUNIT_TEST(testObjectSessionWatcher2);
+ CPPUNIT_TEST(testNodeWatcher1);
+ CPPUNIT_TEST(testChildWatcher1);
+ CPPUNIT_TEST(testChildWatcher2);
+ CPPUNIT_TEST_SUITE_END();
+
+ static void watcher(zhandle_t *, int, int, const char *,void*){}
+ zhandle_t *zh;
+
+public:
+
+ void setUp()
+ {
+ zoo_set_debug_level((ZooLogLevel)0); // disable logging
+ zoo_deterministic_conn_order(0);
+ zh=0;
+ }
+
+ void tearDown()
+ {
+ zookeeper_close(zh);
+ }
+
+ class ConnectionWatcher: public WatcherAction{
+ public:
+ ConnectionWatcher():connected_(false),counter_(0){}
+ virtual void onConnectionEstablished(zhandle_t*){
+ synchronized(mx_);
+ counter_++;
+ connected_=true;
+ }
+ SyncedBoolCondition isConnectionEstablished() const{
+ return SyncedBoolCondition(connected_,mx_);
+ }
+ bool connected_;
+ int counter_;
+ };
+
+ class DisconnectWatcher: public WatcherAction{
+ public:
+ DisconnectWatcher():disconnected_(false),counter_(0){}
+ virtual void onConnectionLost(zhandle_t*){
+ synchronized(mx_);
+ counter_++;
+ disconnected_=true;
+ }
+ SyncedBoolCondition isDisconnected() const{
+ return SyncedBoolCondition(disconnected_,mx_);
+ }
+ bool disconnected_;
+ int counter_;
+ };
+
+ class CountingDataWatcher: public WatcherAction{
+ public:
+ CountingDataWatcher():disconnected_(false),counter_(0){}
+ virtual void onNodeValueChanged(zhandle_t*,const char* path){
+ synchronized(mx_);
+ counter_++;
+ }
+ virtual void onConnectionLost(zhandle_t*){
+ synchronized(mx_);
+ counter_++;
+ disconnected_=true;
+ }
+ bool disconnected_;
+ int counter_;
+ };
+
+ class DeletionCountingDataWatcher: public WatcherAction{
+ public:
+ DeletionCountingDataWatcher():counter_(0){}
+ virtual void onNodeDeleted(zhandle_t*,const char* path){
+ synchronized(mx_);
+ counter_++;
+ }
+ int counter_;
+ };
+
+ class ChildEventCountingWatcher: public WatcherAction{
+ public:
+ ChildEventCountingWatcher():counter_(0){}
+ virtual void onChildChanged(zhandle_t*,const char* path){
+ synchronized(mx_);
+ counter_++;
+ }
+ int counter_;
+ };
+
+#ifndef THREADED
+
+ // verify: the default watcher is called once for a session event
+ void testDefaultSessionWatcher1(){
+ Mock_gettimeofday timeMock;
+ ZookeeperServer zkServer;
+ // must call zookeeper_close() while all the mocks are in scope
+ CloseFinally guard(&zh);
+
+ ConnectionWatcher watcher;
+ zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+ &watcher,0);
+ CPPUNIT_ASSERT(zh!=0);
+
+ int fd=0;
+ int interest=0;
+ timeval tv;
+ // open the socket
+ int rc=zookeeper_interest(zh,&fd,&interest,&tv);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+ CPPUNIT_ASSERT_EQUAL(CONNECTING_STATE,zoo_state(zh));
+ // send the handshake packet to the server
+ rc=zookeeper_process(zh,interest);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+ CPPUNIT_ASSERT_EQUAL(ASSOCIATING_STATE,zoo_state(zh));
+ // receive the server handshake response
+ rc=zookeeper_process(zh,interest);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+ // verify connected
+ CPPUNIT_ASSERT_EQUAL(CONNECTED_STATE,zoo_state(zh));
+ CPPUNIT_ASSERT(watcher.connected_);
+ CPPUNIT_ASSERT_EQUAL(1,watcher.counter_);
+ }
+
+ // test case: connect to server, set a default watcher, disconnect from the server
+ // verify: the default watcher is called once
+ void testDefaultSessionWatcher2(){
+ Mock_gettimeofday timeMock;
+ ZookeeperServer zkServer;
+ // must call zookeeper_close() while all the mocks are in scope
+ CloseFinally guard(&zh);
+
+ DisconnectWatcher watcher;
+ zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+ &watcher,0);
+ CPPUNIT_ASSERT(zh!=0);
+ // simulate connected state
+ forceConnected(zh);
+
+ // first operation
+ AsyncCompletion ignored;
+ zkServer.addOperationResponse(new ZooGetResponse("1",1));
+ int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&ignored);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+ // this will process the response and activate the watcher
+ rc=zookeeper_process(zh,ZOOKEEPER_READ);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+
+ // now, disconnect
+ zkServer.setConnectionLost();
+ rc=zookeeper_process(zh,ZOOKEEPER_READ);
+ CPPUNIT_ASSERT_EQUAL(ZCONNECTIONLOSS,rc);
+ // verify disconnected
+ CPPUNIT_ASSERT(watcher.disconnected_);
+ CPPUNIT_ASSERT_EQUAL(1,watcher.counter_);
+ }
+
+ // testcase: connect to the server, set a watcher object on a node,
+ // disconnect from the server
+ // verify: the watcher object as well as the default watcher are called
+ void testObjectSessionWatcher1(){
+ Mock_gettimeofday timeMock;
+ ZookeeperServer zkServer;
+ // must call zookeeper_close() while all the mocks are in scope
+ CloseFinally guard(&zh);
+
+ DisconnectWatcher defWatcher;
+ zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+ &defWatcher,0);
+ CPPUNIT_ASSERT(zh!=0);
+ // simulate connected state
+ forceConnected(zh);
+
+ AsyncCompletion ignored;
+ CountingDataWatcher wobject;
+ zkServer.addOperationResponse(new ZooStatResponse);
+ int rc=zoo_awexists(zh,"/x/y/1",activeWatcher,&wobject,
+ asyncCompletion,&ignored);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+ // this will process the response and activate the watcher
+ rc=zookeeper_process(zh,ZOOKEEPER_READ);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+
+ // now, disconnect
+ zkServer.setConnectionLost();
+ rc=zookeeper_process(zh,ZOOKEEPER_READ);
+ CPPUNIT_ASSERT_EQUAL(ZCONNECTIONLOSS,rc);
+
+ // verify the default watcher has been triggered
+ CPPUNIT_ASSERT(defWatcher.disconnected_);
+ // and triggered only once
+ CPPUNIT_ASSERT_EQUAL(1,defWatcher.counter_);
+
+ // the path-specific watcher has been triggered as well
+ CPPUNIT_ASSERT(wobject.disconnected_);
+ // only once!
+ CPPUNIT_ASSERT_EQUAL(1,wobject.counter_);
+ }
+
+ // testcase: connect to the server, set a watcher object on a node,
+ // set a def watcher on another node,disconnect from the server
+ // verify: the watcher object as well as the default watcher are called
+ void testObjectSessionWatcher2(){
+ Mock_gettimeofday timeMock;
+ ZookeeperServer zkServer;
+ // must call zookeeper_close() while all the mocks are in scope
+ CloseFinally guard(&zh);
+
+ DisconnectWatcher defWatcher;
+ zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+ &defWatcher,0);
+ CPPUNIT_ASSERT(zh!=0);
+ // simulate connected state
+ forceConnected(zh);
+
+ // set the default watcher
+ AsyncCompletion ignored;
+ zkServer.addOperationResponse(new ZooStatResponse);
+ int rc=zoo_aexists(zh,"/a/b/c",1,asyncCompletion,&ignored);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+
+ CountingDataWatcher wobject;
+ zkServer.addOperationResponse(new ZooStatResponse);
+ rc=zoo_awexists(zh,"/x/y/z",activeWatcher,&wobject,
+ asyncCompletion,&ignored);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+
+ // this will process the response and activate the watcher
+ while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK);
+ CPPUNIT_ASSERT_EQUAL(ZNOTHING,rc);
+
+ // disconnect now
+ zkServer.setConnectionLost();
+ rc=zookeeper_process(zh,ZOOKEEPER_READ);
+ CPPUNIT_ASSERT_EQUAL(ZCONNECTIONLOSS,rc);
+
+ // verify the default watcher has been triggered
+ CPPUNIT_ASSERT(defWatcher.disconnected_);
+ // and triggered only once
+ CPPUNIT_ASSERT_EQUAL(1,defWatcher.counter_);
+
+ // the path-specific watcher has been triggered as well
+ CPPUNIT_ASSERT(wobject.disconnected_);
+ // only once!
+ CPPUNIT_ASSERT_EQUAL(1,wobject.counter_);
+ }
+
+ // testcase: register 2 node watches for different paths, trigger the watches
+ // verify: the data watchers are processed, the default watcher is not called
+ void testNodeWatcher1(){
+ Mock_gettimeofday timeMock;
+ ZookeeperServer zkServer;
+ // must call zookeeper_close() while all the mocks are in scope
+ CloseFinally guard(&zh);
+
+ DisconnectWatcher defWatcher;
+ zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+ &defWatcher,0);
+ CPPUNIT_ASSERT(zh!=0);
+ // simulate connected state
+ forceConnected(zh);
+
+ AsyncCompletion ignored;
+ CountingDataWatcher wobject1;
+ zkServer.addOperationResponse(new ZooStatResponse);
+ int rc=zoo_awexists(zh,"/a/b/c",activeWatcher,&wobject1,
+ asyncCompletion,&ignored);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+
+ CountingDataWatcher wobject2;
+ zkServer.addOperationResponse(new ZooStatResponse);
+ rc=zoo_awexists(zh,"/x/y/z",activeWatcher,&wobject2,
+ asyncCompletion,&ignored);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+
+ // this will process the response and activate the watcher
+ while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK);
+ CPPUNIT_ASSERT_EQUAL(ZNOTHING,rc);
+
+ // we are all set now; let's trigger the watches
+ zkServer.addRecvResponse(new ZNodeEvent(CHANGED_EVENT,"/a/b/c"));
+ zkServer.addRecvResponse(new ZNodeEvent(CHANGED_EVENT,"/x/y/z"));
+ // make sure all watchers have been processed
+ while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK);
+ CPPUNIT_ASSERT_EQUAL(ZNOTHING,rc);
+
+ CPPUNIT_ASSERT_EQUAL(1,wobject1.counter_);
+ CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_);
+ CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_);
+ }
+
+ // testcase: set up both a children and a data watchers on the node /a, then
+ // delete the node by sending a DELETE_EVENT event
+ // verify: both watchers are triggered
+ void testChildWatcher1(){
+ Mock_gettimeofday timeMock;
+ ZookeeperServer zkServer;
+ // must call zookeeper_close() while all the mocks are in scope
+ CloseFinally guard(&zh);
+
+ DeletionCountingDataWatcher defWatcher;
+ zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+ &defWatcher,0);
+ CPPUNIT_ASSERT(zh!=0);
+ // simulate connected state
+ forceConnected(zh);
+
+ AsyncCompletion ignored;
+ DeletionCountingDataWatcher wobject1;
+ zkServer.addOperationResponse(new ZooStatResponse);
+ int rc=zoo_awexists(zh,"/a",activeWatcher,&wobject1,
+ asyncCompletion,&ignored);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+
+ typedef ZooGetChildrenResponse::StringVector ZooVector;
+ zkServer.addOperationResponse(new ZooGetChildrenResponse(
+ Util::CollectionBuilder<ZooVector>()("/a/1")("/a/2")
+ ));
+ DeletionCountingDataWatcher wobject2;
+ rc=zoo_awget_children(zh,"/a",activeWatcher,
+ &wobject2,asyncCompletion,&ignored);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+
+ // this will process the response and activate the watcher
+ while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK);
+ CPPUNIT_ASSERT_EQUAL(ZNOTHING,rc);
+
+ // we are all set now; let's trigger the watches
+ zkServer.addRecvResponse(new ZNodeEvent(DELETED_EVENT,"/a"));
+ // make sure the watchers have been processed
+ while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK);
+ CPPUNIT_ASSERT_EQUAL(ZNOTHING,rc);
+
+ CPPUNIT_ASSERT_EQUAL(1,wobject1.counter_);
+ CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_);
+ CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_);
+ }
+
+ // testcase: create both a child and data watch on the node /a, send a CHILD_EVENT
+ // verify: only the child watch triggered
+ void testChildWatcher2(){
+ Mock_gettimeofday timeMock;
+ ZookeeperServer zkServer;
+ // must call zookeeper_close() while all the mocks are in scope
+ CloseFinally guard(&zh);
+
+ ChildEventCountingWatcher defWatcher;
+ zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+ &defWatcher,0);
+ CPPUNIT_ASSERT(zh!=0);
+ // simulate connected state
+ forceConnected(zh);
+
+ AsyncCompletion ignored;
+ ChildEventCountingWatcher wobject1;
+ zkServer.addOperationResponse(new ZooStatResponse);
+ int rc=zoo_awexists(zh,"/a",activeWatcher,&wobject1,
+ asyncCompletion,&ignored);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+
+ typedef ZooGetChildrenResponse::StringVector ZooVector;
+ zkServer.addOperationResponse(new ZooGetChildrenResponse(
+ Util::CollectionBuilder<ZooVector>()("/a/1")("/a/2")
+ ));
+ ChildEventCountingWatcher wobject2;
+ rc=zoo_awget_children(zh,"/a",activeWatcher,
+ &wobject2,asyncCompletion,&ignored);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+
+ // this will process the response and activate the watcher
+ while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK);
+ CPPUNIT_ASSERT_EQUAL(ZNOTHING,rc);
+
+ // we are all set now; let's trigger the watches
+ zkServer.addRecvResponse(new ZNodeEvent(CHILD_EVENT,"/a"));
+ // make sure the watchers have been processed
+ while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK);
+ CPPUNIT_ASSERT_EQUAL(ZNOTHING,rc);
+
+ CPPUNIT_ASSERT_EQUAL(0,wobject1.counter_);
+ CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_);
+ CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_);
+ }
+
+#else
+ // verify: the default watcher is called once for a session event
+ void testDefaultSessionWatcher1(){
+ Mock_gettimeofday timeMock;
+ // zookeeper simulator
+ ZookeeperServer zkServer;
+ // detects when all watchers have been delivered
+ WatcherDeliveryTracker deliveryTracker(SESSION_EVENT,CONNECTED_STATE);
+ Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+ // must call zookeeper_close() while all the mocks are in the scope!
+ CloseFinally guard(&zh);
+
+ ConnectionWatcher watcher;
+ zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+ &watcher,0);
+ CPPUNIT_ASSERT(zh!=0);
+ // wait till watcher proccessing has completed (the connection
+ // established event)
+ CPPUNIT_ASSERT(ensureCondition(
+ deliveryTracker.isWatcherProcessingCompleted(),1000)<1000);
+
+ // verify the watcher has been triggered
+ CPPUNIT_ASSERT(ensureCondition(watcher.isConnectionEstablished(),1000)<1000);
+ // triggered only once
+ CPPUNIT_ASSERT_EQUAL(1,watcher.counter_);
+ }
+
+ // test case: connect to server, set a default watcher, disconnect from the server
+ // verify: the default watcher is called once
+ void testDefaultSessionWatcher2(){
+ Mock_gettimeofday timeMock;
+ // zookeeper simulator
+ ZookeeperServer zkServer;
+ Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+ // must call zookeeper_close() while all the mocks are in the scope!
+ CloseFinally guard(&zh);
+
+ // detects when all watchers have been delivered
+ WatcherDeliveryTracker deliveryTracker(SESSION_EVENT,CONNECTING_STATE);
+ DisconnectWatcher watcher;
+ zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+ &watcher,0);
+ CPPUNIT_ASSERT(zh!=0);
+ // make sure the client has connected
+ CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
+ // set a default watch
+ AsyncCompletion ignored;
+ // a successful server response will activate the watcher
+ zkServer.addOperationResponse(new ZooStatResponse);
+ int rc=zoo_aexists(zh,"/x/y/z",1,asyncCompletion,&ignored);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+
+ // now, initiate a disconnect
+ zkServer.setConnectionLost();
+ CPPUNIT_ASSERT(ensureCondition(
+ deliveryTracker.isWatcherProcessingCompleted(),1000)<1000);
+
+ // verify the watcher has been triggered
+ CPPUNIT_ASSERT(watcher.disconnected_);
+ // triggered only once
+ CPPUNIT_ASSERT_EQUAL(1,watcher.counter_);
+ }
+
+ // testcase: connect to the server, set a watcher object on a node,
+ // disconnect from the server
+ // verify: the watcher object as well as the default watcher are called
+ void testObjectSessionWatcher1(){
+ Mock_gettimeofday timeMock;
+ // zookeeper simulator
+ ZookeeperServer zkServer;
+ Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+ // must call zookeeper_close() while all the mocks are in the scope!
+ CloseFinally guard(&zh);
+
+ // detects when all watchers have been delivered
+ WatcherDeliveryTracker deliveryTracker(SESSION_EVENT,CONNECTING_STATE);
+ DisconnectWatcher defWatcher;
+ // use the tracker to find out when the watcher has been activated
+ WatcherActivationTracker activationTracker;
+ zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+ &defWatcher,0);
+ CPPUNIT_ASSERT(zh!=0);
+ // make sure the client has connected
+ CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
+
+ AsyncCompletion ignored;
+ // this successful server response will activate the watcher
+ zkServer.addOperationResponse(new ZooStatResponse);
+ CountingDataWatcher wobject;
+ activationTracker.track(&wobject);
+ // set a path-specific watcher
+ int rc=zoo_awexists(zh,"/x/y/z",activeWatcher,&wobject,
+ asyncCompletion,&ignored);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+ // make sure the watcher gets activated before we continue
+ CPPUNIT_ASSERT(ensureCondition(activationTracker.isWatcherActivated(),1000)<1000);
+
+ // now, initiate a disconnect
+ zkServer.setConnectionLost();
+ // make sure all watchers have been processed
+ CPPUNIT_ASSERT(ensureCondition(
+ deliveryTracker.isWatcherProcessingCompleted(),1000)<1000);
+
+ // verify the default watcher has been triggered
+ CPPUNIT_ASSERT(defWatcher.disconnected_);
+ // and triggered only once
+ CPPUNIT_ASSERT_EQUAL(1,defWatcher.counter_);
+
+ // the path-specific watcher has been triggered as well
+ CPPUNIT_ASSERT(wobject.disconnected_);
+ // only once!
+ CPPUNIT_ASSERT_EQUAL(1,wobject.counter_);
+ }
+
+ // testcase: connect to the server, set a watcher object on a node,
+ // set a def watcher on another node,disconnect from the server
+ // verify: the watcher object as well as the default watcher are called
+ void testObjectSessionWatcher2(){
+ Mock_gettimeofday timeMock;
+ // zookeeper simulator
+ ZookeeperServer zkServer;
+ Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+ // must call zookeeper_close() while all the mocks are in the scope!
+ CloseFinally guard(&zh);
+
+ // detects when all watchers have been delivered
+ WatcherDeliveryTracker deliveryTracker(SESSION_EVENT,CONNECTING_STATE);
+ DisconnectWatcher defWatcher;
+ // use the tracker to find out when the watcher has been activated
+ WatcherActivationTracker activationTracker;
+ zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+ &defWatcher,0);
+ CPPUNIT_ASSERT(zh!=0);
+ // make sure the client has connected
+ CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
+
+ // set a default watch
+ AsyncCompletion ignored;
+ // a successful server response will activate the watcher
+ zkServer.addOperationResponse(new ZooStatResponse);
+ activationTracker.track(&defWatcher);
+ int rc=zoo_aexists(zh,"/a/b/c",1,asyncCompletion,&ignored);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+ // make sure the watcher gets activated before we continue
+ CPPUNIT_ASSERT(ensureCondition(activationTracker.isWatcherActivated(),1000)<1000);
+
+ // this successful server response will activate the watcher
+ zkServer.addOperationResponse(new ZooStatResponse);
+ CountingDataWatcher wobject;
+ activationTracker.track(&wobject);
+ // set a path-specific watcher
+ rc=zoo_awexists(zh,"/x/y/z",activeWatcher,&wobject,
+ asyncCompletion,&ignored);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+ // make sure the watcher gets activated before we continue
+ CPPUNIT_ASSERT(ensureCondition(activationTracker.isWatcherActivated(),1000)<1000);
+
+ // now, initiate a disconnect
+ zkServer.setConnectionLost();
+ // make sure all watchers have been processed
+ CPPUNIT_ASSERT(ensureCondition(
+ deliveryTracker.isWatcherProcessingCompleted(),1000)<1000);
+
+ // verify the default watcher has been triggered
+ CPPUNIT_ASSERT(defWatcher.disconnected_);
+ // and triggered only once
+ CPPUNIT_ASSERT_EQUAL(1,defWatcher.counter_);
+
+ // the path-specific watcher has been triggered as well
+ CPPUNIT_ASSERT(wobject.disconnected_);
+ // only once!
+ CPPUNIT_ASSERT_EQUAL(1,wobject.counter_);
+ }
+
+ // testcase: register 2 node watches for different paths, trigger the watches
+ // verify: the data watchers are processed, the default watcher is not called
+ void testNodeWatcher1(){
+ Mock_gettimeofday timeMock;
+ // zookeeper simulator
+ ZookeeperServer zkServer;
+ Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+ // must call zookeeper_close() while all the mocks are in the scope!
+ CloseFinally guard(&zh);
+
+ // detects when all watchers have been delivered
+ WatcherDeliveryTracker deliveryTracker(CHANGED_EVENT,0,false);
+ CountingDataWatcher defWatcher;
+ // use the tracker to find out when the watcher has been activated
+ WatcherActivationTracker activationTracker;
+ zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+ &defWatcher,0);
+ CPPUNIT_ASSERT(zh!=0);
+ // make sure the client has connected
+ CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
+
+ // don't care about completions
+ AsyncCompletion ignored;
+ // set a one-shot watch
+ // a successful server response will activate the watcher
+ zkServer.addOperationResponse(new ZooStatResponse);
+ CountingDataWatcher wobject1;
+ activationTracker.track(&wobject1);
+ int rc=zoo_awexists(zh,"/a/b/c",activeWatcher,&wobject1,
+ asyncCompletion,&ignored);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+ // make sure the watcher gets activated before we continue
+ CPPUNIT_ASSERT(ensureCondition(activationTracker.isWatcherActivated(),1000)<1000);
+
+ // this successful server response will activate the watcher
+ zkServer.addOperationResponse(new ZooStatResponse);
+ CountingDataWatcher wobject2;
+ activationTracker.track(&wobject2);
+ // set a path-specific watcher
+ rc=zoo_awexists(zh,"/x/y/z",activeWatcher,&wobject2,
+ asyncCompletion,&ignored);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+ // make sure the watcher gets activated before we continue
+ CPPUNIT_ASSERT(ensureCondition(activationTracker.isWatcherActivated(),1000)<1000);
+
+ // we are all set now; let's trigger the watches
+ zkServer.addRecvResponse(new ZNodeEvent(CHANGED_EVENT,"/a/b/c"));
+ zkServer.addRecvResponse(new ZNodeEvent(CHANGED_EVENT,"/x/y/z"));
+ // make sure all watchers have been processed
+ CPPUNIT_ASSERT(ensureCondition(
+ deliveryTracker.deliveryCounterEquals(2),1000)<1000);
+
+ CPPUNIT_ASSERT_EQUAL(1,wobject1.counter_);
+ CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_);
+ CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_);
+ }
+
+ // testcase: set up both a children and a data watchers on the node /a, then
+ // delete the node (that is, send a DELETE_EVENT)
+ // verify: both watchers are triggered
+ void testChildWatcher1(){
+ Mock_gettimeofday timeMock;
+ // zookeeper simulator
+ ZookeeperServer zkServer;
+ Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+ // must call zookeeper_close() while all the mocks are in the scope!
+ CloseFinally guard(&zh);
+
+ // detects when all watchers have been delivered
+ WatcherDeliveryTracker deliveryTracker(DELETED_EVENT,0);
+ DeletionCountingDataWatcher defWatcher;
+ zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+ &defWatcher,0);
+ CPPUNIT_ASSERT(zh!=0);
+ // make sure the client has connected
+ CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
+
+ // a successful server response will activate the watcher
+ zkServer.addOperationResponse(new ZooStatResponse);
+ DeletionCountingDataWatcher wobject1;
+ Stat stat;
+ // add a node watch
+ int rc=zoo_wexists(zh,"/a",activeWatcher,&wobject1,&stat);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+
+ typedef ZooGetChildrenResponse::StringVector ZooVector;
+ zkServer.addOperationResponse(new ZooGetChildrenResponse(
+ Util::CollectionBuilder<ZooVector>()("/a/1")("/a/2")
+ ));
+ DeletionCountingDataWatcher wobject2;
+ String_vector children;
+ rc=zoo_wget_children(zh,"/a",activeWatcher,&wobject2,&children);
+ deallocate_String_vector(&children);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+
+ // we are all set now; let's trigger the watches
+ zkServer.addRecvResponse(new ZNodeEvent(DELETED_EVENT,"/a"));
+ // make sure the watchers have been processed
+ CPPUNIT_ASSERT(ensureCondition(
+ deliveryTracker.isWatcherProcessingCompleted(),1000)<1000);
+
+ CPPUNIT_ASSERT_EQUAL(1,wobject1.counter_);
+ CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_);
+ CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_);
+ }
+
+ // testcase: create both a child and data watch on the node /a, send a CHILD_EVENT
+ // verify: only the child watch triggered
+ void testChildWatcher2(){
+ Mock_gettimeofday timeMock;
+ // zookeeper simulator
+ ZookeeperServer zkServer;
+ Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+ // must call zookeeper_close() while all the mocks are in the scope!
+ CloseFinally guard(&zh);
+
+ // detects when all watchers have been delivered
+ WatcherDeliveryTracker deliveryTracker(CHILD_EVENT,0);
+ ChildEventCountingWatcher defWatcher;
+ zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+ &defWatcher,0);
+ CPPUNIT_ASSERT(zh!=0);
+ // make sure the client has connected
+ CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
+
+ // a successful server response will activate the watcher
+ zkServer.addOperationResponse(new ZooStatResponse);
+ ChildEventCountingWatcher wobject1;
+ Stat stat;
+ // add a node watch
+ int rc=zoo_wexists(zh,"/a",activeWatcher,&wobject1,&stat);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+
+ typedef ZooGetChildrenResponse::StringVector ZooVector;
+ zkServer.addOperationResponse(new ZooGetChildrenResponse(
+ Util::CollectionBuilder<ZooVector>()("/a/1")("/a/2")
+ ));
+ ChildEventCountingWatcher wobject2;
+ String_vector children;
+ rc=zoo_wget_children(zh,"/a",activeWatcher,&wobject2,&children);
+ deallocate_String_vector(&children);
+ CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+
+ // we are all set now; let's trigger the watches
+ zkServer.addRecvResponse(new ZNodeEvent(CHILD_EVENT,"/a"));
+ // make sure the watchers have been processed
+ CPPUNIT_ASSERT(ensureCondition(
+ deliveryTracker.isWatcherProcessingCompleted(),1000)<1000);
+
+ CPPUNIT_ASSERT_EQUAL(0,wobject1.counter_);
+ CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_);
+ CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_);
+ }
+
+#endif //THREADED
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_watchers);
Propchange: hadoop/zookeeper/trunk/src/c/tests/TestWatchers.cc
------------------------------------------------------------------------------
svn:eol-style = native
Modified: hadoop/zookeeper/trunk/src/c/tests/TestZookeeperClose.cc
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/TestZookeeperClose.cc?rev=679557&r1=679556&r2=679557&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/c/tests/TestZookeeperClose.cc (original)
+++ hadoop/zookeeper/trunk/src/c/tests/TestZookeeperClose.cc Thu Jul 24 14:46:30 2008
@@ -38,7 +38,7 @@
CPPUNIT_TEST(testCloseFromWatcher1);
CPPUNIT_TEST_SUITE_END();
zhandle_t *zh;
- static void watcher(zhandle_t *, int, int, const char *){}
+ static void watcher(zhandle_t *, int, int, const char *,void*){}
public:
void setUp()
{
@@ -59,7 +59,7 @@
virtual void onSessionExpired(zhandle_t* zh){
memcpy(&lzh,zh,sizeof(lzh));
if(callClose_)
- rc=zookeeper_close(zh);
+ rc=zookeeper_close(zh);
}
zhandle_t lzh;
bool callClose_;
@@ -88,7 +88,7 @@
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname));
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs));
- CPPUNIT_ASSERT_EQUAL(3,freeMock.callCounter);
+ CPPUNIT_ASSERT_EQUAL(9,freeMock.callCounter);
}
void testCloseUnconnected1()
{
@@ -236,7 +236,7 @@
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname));
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs));
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(adaptor));
- CPPUNIT_ASSERT_EQUAL(4,freeMock.callCounter);
+ CPPUNIT_ASSERT_EQUAL(10,freeMock.callCounter);
// threads
CPPUNIT_ASSERT_EQUAL(1,MockPthreadsNull::getDestroyCounter(adaptor->io));
CPPUNIT_ASSERT_EQUAL(1,MockPthreadsNull::getDestroyCounter(adaptor->completion));
@@ -410,6 +410,8 @@
CPPUNIT_ASSERT(zh!=0);
CPPUNIT_ASSERT(ensureCondition(SessionExpired(zh),1000)<1000);
CPPUNIT_ASSERT(ensureCondition(IOThreadStopped(zh),1000)<1000);
+ // make sure the watcher has been processed
+ CPPUNIT_ASSERT(ensureCondition(closeAction.isWatcherTriggered(),1000)<1000);
// make sure the threads have not been destroyed yet
adaptor_threads* adaptor=(adaptor_threads*)zh->adaptor_priv;
CPPUNIT_ASSERT_EQUAL(0,CheckedPthread::getDestroyCounter(adaptor->io));
Modified: hadoop/zookeeper/trunk/src/c/tests/TestZookeeperInit.cc
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/TestZookeeperInit.cc?rev=679557&r1=679556&r2=679557&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/c/tests/TestZookeeperInit.cc (original)
+++ hadoop/zookeeper/trunk/src/c/tests/TestZookeeperInit.cc Thu Jul 24 14:46:30 2008
@@ -49,7 +49,7 @@
CPPUNIT_TEST_SUITE_END();
zhandle_t *zh;
MockPthreadsNull* pthreadMock;
- static void watcher(zhandle_t *, int , int , const char *){}
+ static void watcher(zhandle_t *, int , int , const char *,void*){}
public:
Zookeeper_init():zh(0),pthreadMock(0){}
Added: hadoop/zookeeper/trunk/src/c/tests/Vector.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/Vector.h?rev=679557&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/c/tests/Vector.h (added)
+++ hadoop/zookeeper/trunk/src/c/tests/Vector.h Thu Jul 24 14:46:30 2008
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _VECTOR_UTIL_H
+#define _VECTOR_UTIL_H
+
+#include <vector>
+
+// function to conveniently stream vectors
+template <class U>
+std::ostream& operator<<(std::ostream& os,const std::vector<U>& c){
+ typedef std::vector<U> V;
+ os<<"[";
+ if(c.size()>0){
+ for(typename V::const_iterator it=c.begin();it!=c.end();++it)
+ os<<*it<<",";
+ os.seekp(-1,std::ios::cur);
+ }
+ os<<"]";
+ return os;
+}
+
+#endif // _VECTOR_UTIL_H
Propchange: hadoop/zookeeper/trunk/src/c/tests/Vector.h
------------------------------------------------------------------------------
svn:eol-style = native