You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ak...@apache.org on 2008/07/24 23:46:31 UTC

svn commit: r679557 [2/3] - in /hadoop/zookeeper/trunk/src/c: ./ include/ src/ src/hashtable/ tests/

Modified: hadoop/zookeeper/trunk/src/c/src/zookeeper.c
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/src/zookeeper.c?rev=679557&r1=679556&r2=679557&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/c/src/zookeeper.c (original)
+++ hadoop/zookeeper/trunk/src/c/src/zookeeper.c Thu Jul 24 14:46:30 2008
@@ -29,6 +29,8 @@
 #include <proto.h>
 #include "zk_adaptor.h"
 #include "zk_log.h"
+#include "zk_hashtable.h"
+
 #include <stdlib.h>
 #include <stdio.h>
 #include <string.h>
@@ -55,50 +57,50 @@
 const int EPHEMERAL = 1 << 0;
 const int SEQUENCE = 1 << 1;
 
-const int EXPIRED_SESSION_STATE = -112;
-const int AUTH_FAILED_STATE = -113;
-const int CONNECTING_STATE = 1;
-const int ASSOCIATING_STATE = 2;
-const int CONNECTED_STATE = 3;
+const int EXPIRED_SESSION_STATE = EXPIRED_SESSION_STATE_DEF;
+const int AUTH_FAILED_STATE = AUTH_FAILED_STATE_DEF;
+const int CONNECTING_STATE = CONNECTING_STATE_DEF;
+const int ASSOCIATING_STATE = ASSOCIATING_STATE_DEF;
+const int CONNECTED_STATE = CONNECTED_STATE_DEF;
 static __attribute__ ((unused)) const char* state2String(int state){
     switch(state){
     case 0:
         return "CLOSED_STATE";
-    case 1 /*CONNECTING_STATE*/:
+    case CONNECTING_STATE_DEF:
         return "CONNECTING_STATE";
-    case 2 /*ASSOCIATING_STATE*/:
+    case ASSOCIATING_STATE_DEF:
         return "ASSOCIATING_STATE";
-    case 3 /*CONNECTED_STATE*/:
+    case CONNECTED_STATE_DEF:
         return "CONNECTED_STATE";
-    case -112 /*EXPIRED_SESSION_STATE*/:
+    case EXPIRED_SESSION_STATE_DEF:
         return "EXPIRED_SESSION_STATE";
-    case -113 /*AUTH_FAILED_STATE*/:
+    case AUTH_FAILED_STATE_DEF:
         return "AUTH_FAILED_STATE";
     }
     return "INVALID_STATE";
 }
 
-const int CREATED_EVENT = 1;
-const int DELETED_EVENT = 2;
-const int CHANGED_EVENT = 3;
-const int CHILD_EVENT = 4;
-const int SESSION_EVENT = -1;
-const int NOTWATCHING_EVENT = -2;
+const int CREATED_EVENT = CREATED_EVENT_DEF;
+const int DELETED_EVENT = DELETED_EVENT_DEF;
+const int CHANGED_EVENT = CHANGED_EVENT_DEF;
+const int CHILD_EVENT = CHILD_EVENT_DEF;
+const int SESSION_EVENT = SESSION_EVENT_DEF;
+const int NOTWATCHING_EVENT = NOTWATCHING_EVENT_DEF;
 static __attribute__ ((unused)) const char* watcherEvent2String(int ev){
     switch(ev){
     case 0:
         return "ERROR_EVENT";
-    case 1 /*CREATED_EVENT*/:
+    case CREATED_EVENT_DEF:
         return "CREATED_EVENT";
-    case 2 /*DELETED_EVENT*/:
+    case DELETED_EVENT_DEF:
         return "DELETED_EVENT";
-    case 3 /*CHANGED_EVENT*/:
+    case CHANGED_EVENT_DEF:
         return "CHANGED_EVENT";
-    case 4 /*CHILD_EVENT*/:
+    case CHILD_EVENT_DEF:
         return "CHILD_EVENT";
-    case -1 /*SESSION_EVENT*/:
+    case SESSION_EVENT_DEF:
         return "SESSION_EVENT";
-    case -2 /*NOTWATCHING_EVENT*/:
+    case NOTWATCHING_EVENT_DEF:
         return "NOTWATCHING_EVENT";
     }
     return "INVALID_EVENT";
@@ -126,19 +128,6 @@
 #define COMPLETION_ACLLIST 4
 #define COMPLETION_STRING 5
 
-const char*err2string(int err);
-static const char* format_endpoint_info(const struct sockaddr* ep);
-static const char* format_current_endpoint_info(zhandle_t* zh);
-static int add_completion(zhandle_t *zh, int xid, int completion_type, 
-        const void *dc, const void *data, int add_to_front);
-static int handle_socket_error_msg(zhandle_t *zh, int line, int rc,
-    const char* format,...);
-static void cleanup_bufs(zhandle_t *zh,int callCompletion,int rc);
-
-static int disable_conn_permute=0; // permute enabled by default
-
-static void *SYNCHRONOUS_MARKER = (void*)&SYNCHRONOUS_MARKER;
-    
 typedef struct _completion_list {
     int xid;
     int completion_type; /* one of the COMPLETION_* values */
@@ -153,8 +142,30 @@
     const void *data;
     buffer_list_t *buffer;
     struct _completion_list *next;
+    watcher_registration_t* watcher;
 } completion_list_t;
 
+const char*err2string(int err);
+static const char* format_endpoint_info(const struct sockaddr* ep);
+static const char* format_current_endpoint_info(zhandle_t* zh);
+
+/* completion routine forward declarations */
+static int add_completion(zhandle_t *zh, int xid, int completion_type, 
+        const void *dc, const void *data, int add_to_front,watcher_registration_t* wo);
+static completion_list_t* create_completion_entry(int xid, int completion_type,
+        const void *dc, const void *data,watcher_registration_t* wo);
+static void destroy_completion_entry(completion_list_t* c);
+static void queue_completion(completion_head_t *list, completion_list_t *c,
+        int add_to_front);
+
+static int handle_socket_error_msg(zhandle_t *zh, int line, int rc,
+    const char* format,...);
+static void cleanup_bufs(zhandle_t *zh,int callCompletion,int rc);
+
+static int disable_conn_permute=0; // permute enabled by default
+
+static void *SYNCHRONOUS_MARKER = (void*)&SYNCHRONOUS_MARKER;
+
 const void *zoo_get_context(zhandle_t *zh) 
 {
     return zh->context;
@@ -194,6 +205,16 @@
 {
     return (zh->state<0)? ZINVALIDSTATE: ZOK;
 }
+
+int exists_result_checker(int rc)
+{
+    return rc==ZOK ||rc == ZNONODE;
+}
+
+int default_result_checker(int rc)
+{
+    return rc==ZOK;
+}
 /**
  * Frees and closes everything associated with a handle,
  * including the handle itself.
@@ -219,6 +240,8 @@
         zh->addrs = NULL;
     }
     free_auth_info(&zh->auth);
+    destroy_zk_hashtable(zh->active_node_watchers);
+    destroy_zk_hashtable(zh->active_child_watchers);
 }
 
 static void setup_random()
@@ -359,7 +382,7 @@
     return &zh->client_id;
 }
 
-static void null_watcher_fn(zhandle_t* p1, int p2, int p3,const char* p4){}
+static void null_watcher_fn(zhandle_t* p1, int p2, int p3,const char* p4,void*p5){}
 
 watcher_fn zoo_set_watcher(zhandle_t *zh,watcher_fn newFn)
 {
@@ -412,9 +435,13 @@
     zh->last_zxid = 0;
     zh->next_deadline.tv_sec=zh->next_deadline.tv_usec=0;
     zh->socket_readable.tv_sec=zh->socket_readable.tv_usec=0;
+    zh->active_node_watchers=create_zk_hashtable();
+    zh->active_child_watchers=create_zk_hashtable();
+    
     if (adaptor_init(zh) == -1) {
         goto abort;
     }
+    
     return zh;
 abort:
     errnosave=errno;
@@ -675,7 +702,7 @@
                 break;
             }
         }
-        free(cptr);
+        destroy_completion_entry(cptr);
     }
 }
 
@@ -1116,11 +1143,6 @@
     fprintf(LOGSTREAM,"end\n");    
 }
 
-static completion_list_t* create_completion_entry(int xid, int completion_type,
-        const void *dc, const void *data);
-static void queue_completion(completion_head_t *list, completion_list_t *c,
-        int add_to_front);
-
 #ifdef THREADED
 // IO thread queues session events to be processed by the completion thread
 int queue_session_event(zhandle_t *zh, int state)
@@ -1141,12 +1163,7 @@
         close_buffer_oarchive(&oa, 1);
         goto error;
     }
-    if ((cptr=calloc(1,sizeof(*cptr)))==NULL) {
-        LOG_ERROR(("out of memory"));
-        close_buffer_oarchive(&oa, 1);
-        goto error;
-    }
-    cptr->xid = WATCHER_EVENT_XID;
+    cptr = create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0);
     cptr->buffer = allocate_buffer(get_buffer(oa), get_buffer_len(oa));
     cptr->buffer->curr_offset = get_buffer_len(oa);
     if (!cptr->buffer) {
@@ -1180,6 +1197,8 @@
     return cptr;
 }
 
+
+/* handles async completion (both single- and multithreaded) */
 void process_completions(zhandle_t *zh)
 {
     completion_list_t *cptr;
@@ -1201,7 +1220,7 @@
             /* This is a notification so there aren't any pending requests */
             LOG_DEBUG(("Calling a watcher for node [%s], event=%s",
                  (evt.path==NULL?"NULL":evt.path),watcherEvent2String(type)));
-            zh->watcher(zh, type, state, evt.path);
+            deliverWatchers(zh,type,state,evt.path);
             deallocate_WatcherEvent(&evt);
         } else {
             int rc = hdr.err;
@@ -1271,9 +1290,9 @@
                 }
                 break;
             }
-            free_buffer(cptr->buffer);
-            free(cptr);
+            activateWatcher(cptr->watcher,rc);
         }
+        destroy_completion_entry(cptr);
         close_buffer_iarchive(&ia);
     }
 }
@@ -1331,7 +1350,7 @@
         zh->last_zxid = hdr.zxid;
         
         if (hdr.xid == WATCHER_EVENT_XID) {
-            completion_list_t *c = create_completion_entry(WATCHER_EVENT_XID,-1,0,0);
+            completion_list_t *c = create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0);
             c->buffer = bptr;
             queue_completion(&zh->completions_to_process, c, 0);
         } else if(hdr.xid == AUTH_XID){
@@ -1378,10 +1397,10 @@
                 sc->rc = rc;
                 switch(cptr->completion_type) {
                 case COMPLETION_DATA:
+                    LOG_DEBUG(("Calling COMPLETION_DATA for xid=%x rc=%d",cptr->xid,rc));
                     if (rc==0) {
                         struct GetDataResponse res;
                         int len;
-                        LOG_DEBUG(("Calling COMPLETION_DATA for xid=%x rc=%d",cptr->xid,rc));
                         deserialize_GetDataResponse(ia, "reply", &res);
                         if (res.data.len <= sc->u.data.buff_len) {
                             len = res.data.len;
@@ -1395,18 +1414,18 @@
                     }
                     break;
                 case COMPLETION_STAT:
+                    LOG_DEBUG(("Calling COMPLETION_STAT for xid=%x rc=%d",cptr->xid,rc));
                     if (rc == 0) {
                         struct SetDataResponse res;
-                        LOG_DEBUG(("Calling COMPLETION_STAT for xid=%x rc=%d",cptr->xid,rc));
                         deserialize_SetDataResponse(ia, "reply", &res);
                         sc->u.stat = res.stat;
                         deallocate_SetDataResponse(&res);
                     }
                     break;
                 case COMPLETION_STRINGLIST:
+                    LOG_DEBUG(("Calling COMPLETION_STRINGLIST for xid=%x rc=%d",cptr->xid,rc));
                     if (rc == 0) {
                         struct GetChildrenResponse res;
-                        LOG_DEBUG(("Calling COMPLETION_STRINGLIST for xid=%x rc=%d",cptr->xid,rc));
                         deserialize_GetChildrenResponse(ia, "reply", &res);
                         sc->u.strs = res.children;
                         /* We don't deallocate since we are passing it back */
@@ -1414,10 +1433,10 @@
                     }
                     break;
                 case COMPLETION_STRING:
+                    LOG_DEBUG(("Calling COMPLETION_STRING for xid=%x rc=%d",cptr->xid,rc));
                     if (rc == 0) {
                         struct CreateResponse res;
                         int len;
-                        LOG_DEBUG(("Calling COMPLETION_STRING for xid=%x rc=%d",cptr->xid,rc));
                         deserialize_CreateResponse(ia, "reply", &res);
                         if (sc->u.str.str_len > strlen(res.path)) {
                             len = strlen(res.path);
@@ -1430,9 +1449,9 @@
                     }
                     break;
                 case COMPLETION_ACLLIST:
+                    LOG_DEBUG(("Calling COMPLETION_ACLLIST for xid=%x rc=%d",cptr->xid,rc));
                     if (rc == 0) {
                         struct GetACLResponse res;
-                        LOG_DEBUG(("Calling COMPLETION_ACLLIST for xid=%x rc=%d",cptr->xid,rc));
                         deserialize_GetACLResponse(ia, "reply", &res);
                         cptr->c.acl_result(rc, &res.acl, &res.stat, cptr->data);
                         sc->u.acl.acl = res.acl;
@@ -1445,6 +1464,7 @@
                     LOG_DEBUG(("Calling COMPLETION_VOID for xid=%x rc=%d",cptr->xid,rc));
                     break;
                 }
+                activateWatcher(cptr->watcher,rc);
                 notify_sync_completion(sc);
                 free_buffer(bptr);
                 zh->outstanding_sync--;
@@ -1468,8 +1488,30 @@
     return 0;
 }
 
+static watcher_registration_t* create_watcher_registration(const char* path,
+        result_checker_fn checker,watcher_fn watcher,void* ctx,
+        zk_hashtable* activeMap){
+    watcher_registration_t* wo;
+    if(watcher==0)
+        return 0;
+    wo=calloc(1,sizeof(watcher_registration_t));
+    wo->path=strdup(path);
+    wo->watcher=watcher;
+    wo->context=ctx;
+    wo->checker=checker==0?default_result_checker:checker;
+    wo->activeMap=activeMap;
+    return wo;
+}
+
+static void destroy_watcher_registration(watcher_registration_t* wo){
+    if(wo!=0){
+        free((void*)wo->path);
+        free(wo);
+    }
+}
+
 static completion_list_t* create_completion_entry(int xid, int completion_type, 
-        const void *dc, const void *data)
+        const void *dc, const void *data,watcher_registration_t* wo)
 {
     completion_list_t *c = calloc(1,sizeof(completion_list_t));
     if (!c) {
@@ -1499,15 +1541,24 @@
         break;
     }
     c->xid = xid;
-    c->next = 0;
+    c->watcher = wo;
 
     return c;
 }
 
+static void destroy_completion_entry(completion_list_t* c){
+    if(c!=0){
+        if(c->buffer!=0)
+            free_buffer(c->buffer);
+        destroy_watcher_registration(c->watcher);
+        free(c);
+    }
+}
+
 static void queue_completion(completion_head_t *list, completion_list_t *c,
         int add_to_front)
 {
-     c->next = 0;
+    c->next = 0;
     /* appending a new entry to the back of the list */
     lock_completion_list(list);
     if (list->last) {
@@ -1530,10 +1581,11 @@
 }
 
 static int add_completion(zhandle_t *zh, int xid, int completion_type,
-        const void *dc, const void *data, int add_to_front)
+        const void *dc, const void *data, int add_to_front,
+        watcher_registration_t* wo)
 {
     completion_list_t *c =create_completion_entry(xid, completion_type, dc,
-            data);
+            data,wo);
     if (!c) 
         return ZSYSTEMERROR;
     queue_completion(&zh->sent_requests, c, add_to_front);
@@ -1544,39 +1596,39 @@
 }
 
 static int add_data_completion(zhandle_t *zh, int xid, data_completion_t dc,
-        const void *data)
+        const void *data,watcher_registration_t* wo)
 {
-    return add_completion(zh, xid, COMPLETION_DATA, dc, data, 0);
+    return add_completion(zh, xid, COMPLETION_DATA, dc, data, 0,wo);
 }
 
 static int add_stat_completion(zhandle_t *zh, int xid, stat_completion_t dc,
-        const void *data)
+        const void *data,watcher_registration_t* wo)
 {
-    return add_completion(zh, xid, COMPLETION_STAT, dc, data, 0);
+    return add_completion(zh, xid, COMPLETION_STAT, dc, data, 0,wo);
 }
 
 static int add_strings_completion(zhandle_t *zh, int xid,
-        strings_completion_t dc, const void *data)
+        strings_completion_t dc, const void *data,watcher_registration_t* wo)
 {
-    return add_completion(zh, xid, COMPLETION_STRINGLIST, dc, data, 0);
+    return add_completion(zh, xid, COMPLETION_STRINGLIST, dc, data, 0,wo);
 }
 
 static int add_acl_completion(zhandle_t *zh, int xid, acl_completion_t dc,
         const void *data)
 {
-    return add_completion(zh, xid, COMPLETION_ACLLIST, dc, data, 0);
+    return add_completion(zh, xid, COMPLETION_ACLLIST, dc, data, 0,0);
 }
 
 static int add_void_completion(zhandle_t *zh, int xid, void_completion_t dc,
         const void *data)
 {
-    return add_completion(zh, xid, COMPLETION_VOID, dc, data, 0);
+    return add_completion(zh, xid, COMPLETION_VOID, dc, data, 0,0);
 }
 
 static int add_string_completion(zhandle_t *zh, int xid,
         string_completion_t dc, const void *data)
 {
-    return add_completion(zh, xid, COMPLETION_STRING, dc, data, 0);
+    return add_completion(zh, xid, COMPLETION_STRING, dc, data, 0,0);
 }
 
 int zookeeper_close(zhandle_t *zh)
@@ -1625,9 +1677,16 @@
 int zoo_aget(zhandle_t *zh, const char *path, int watch, data_completion_t dc,
         const void *data)
 {
+    return zoo_awget(zh,path,watch?zh->watcher:0,zh->context,dc,data);
+}
+
+int zoo_awget(zhandle_t *zh, const char *path, 
+        watcher_fn watcher, void* watcherCtx, 
+        data_completion_t dc, const void *data)
+{
     struct oarchive *oa; 
     struct RequestHeader h = { .xid = get_xid(), .type = GETDATA_OP};
-    struct GetDataRequest req = { (char*)path, watch };
+    struct GetDataRequest req = { (char*)path, watcher!=0 };
     int rc;
     
     if (zh==0 || path==0)
@@ -1638,7 +1697,9 @@
     rc = serialize_RequestHeader(oa, "header", &h);
     rc = rc < 0 ? rc : serialize_GetDataRequest(oa, "req", &req);
     enter_critical(zh);
-    rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data);
+    rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data,
+        create_watcher_registration(path,0,watcher,watcherCtx,
+                zh->active_node_watchers));
     rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
             get_buffer_len(oa));
     leave_critical(zh);
@@ -1673,7 +1734,7 @@
     rc = serialize_RequestHeader(oa, "header", &h);
     rc = rc < 0 ? rc : serialize_SetDataRequest(oa, "req", &req);
     enter_critical(zh);
-    rc = rc < 0 ? rc : add_stat_completion(zh, h.xid, dc, data);
+    rc = rc < 0 ? rc : add_stat_completion(zh, h.xid, dc, data,0);
     rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
             get_buffer_len(oa));
     leave_critical(zh);
@@ -1761,11 +1822,18 @@
 }
 
 int zoo_aexists(zhandle_t *zh, const char *path, int watch,
+        stat_completion_t sc, const void *data)
+{
+    return zoo_awexists(zh,path,watch?zh->watcher:0,zh->context,sc,data);
+}
+
+int zoo_awexists(zhandle_t *zh, const char *path, 
+        watcher_fn watcher, void* watcherCtx,
         stat_completion_t completion, const void *data)
 {
     struct oarchive *oa;
     struct RequestHeader h = { .xid = get_xid(), .type = EXISTS_OP };
-    struct ExistsRequest req;
+    struct ExistsRequest req = {(char*)path, watcher!=0 };
     int rc;
     
     if (zh==0 || path==0)
@@ -1773,12 +1841,12 @@
     if (is_unrecoverable(zh))
         return ZINVALIDSTATE;
     oa = create_buffer_oarchive();
-    req.path = (char*)path;
-    req.watch = watch;
     rc = serialize_RequestHeader(oa, "header", &h);
     rc = rc < 0 ? rc : serialize_ExistsRequest(oa, "req", &req);
     enter_critical(zh);
-    rc = rc < 0 ? rc : add_stat_completion(zh, h.xid, completion, data);
+    rc = rc < 0 ? rc : add_stat_completion(zh, h.xid, completion, data,
+        create_watcher_registration(path,exists_result_checker,
+                watcher,watcherCtx,zh->active_node_watchers));
     rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
             get_buffer_len(oa));
     leave_critical(zh);
@@ -1793,11 +1861,18 @@
 }
 
 int zoo_aget_children(zhandle_t *zh, const char *path, int watch,
-        strings_completion_t completion, const void *data)
+        strings_completion_t dc, const void *data)
+{
+    return zoo_awget_children(zh,path,watch?zh->watcher:0,zh->context,dc,data);
+}
+
+int zoo_awget_children(zhandle_t *zh, const char *path,
+        watcher_fn watcher, void* watcherCtx, 
+        strings_completion_t dc, const void *data)
 {
     struct oarchive *oa;
     struct RequestHeader h = { .xid = get_xid(), .type = GETCHILDREN_OP};
-    struct GetChildrenRequest req;
+    struct GetChildrenRequest req={(char*)path, watcher!=0 };
     int rc;
     
     if (zh==0 || path==0)
@@ -1805,12 +1880,12 @@
     if (is_unrecoverable(zh))
         return ZINVALIDSTATE;
     oa = create_buffer_oarchive();
-    req.path = (char*)path;
-    req.watch = watch;
     rc = serialize_RequestHeader(oa, "header", &h);
     rc = rc < 0 ? rc : serialize_GetChildrenRequest(oa, "req", &req);
     enter_critical(zh);
-    rc = rc < 0 ? rc : add_strings_completion(zh, h.xid, completion, data);
+    rc = rc < 0 ? rc : add_strings_completion(zh, h.xid, dc, data,
+            create_watcher_registration(path,0,watcher,watcherCtx,
+                    zh->active_child_watchers));
     rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
             get_buffer_len(oa));
     leave_critical(zh);
@@ -2131,12 +2206,18 @@
 
 int zoo_exists(zhandle_t *zh, const char *path, int watch, struct Stat *stat)
 {
+    return zoo_wexists(zh,path,watch?zh->watcher:0,zh->context,stat);
+}
+
+int zoo_wexists(zhandle_t *zh, const char *path,
+        watcher_fn watcher, void* watcherCtx, struct Stat *stat)
+{
     struct sync_completion *sc = alloc_sync_completion();
     int rc;
     if (!sc) {
         return ZSYSTEMERROR;
     }
-    rc=zoo_aexists(zh, path, watch, SYNCHRONOUS_MARKER, sc);
+    rc=zoo_awexists(zh,path,watcher,watcherCtx,SYNCHRONOUS_MARKER, sc);
     if(rc==ZOK){
         wait_sync_completion(sc);
         rc = sc->rc;
@@ -2145,12 +2226,20 @@
         }
     }
     free_sync_completion(sc);
-    return rc;
+    return rc;    
 }
 
 int zoo_get(zhandle_t *zh, const char *path, int watch, char *buffer,
         int* buffer_len, struct Stat *stat)
 {
+    return zoo_wget(zh,path,watch?zh->watcher:0,zh->context,
+            buffer,buffer_len,stat);
+}
+
+int zoo_wget(zhandle_t *zh, const char *path, 
+        watcher_fn watcher, void* watcherCtx, 
+        char *buffer, int* buffer_len, struct Stat *stat)
+{
     struct sync_completion *sc;
     int rc=0;
 
@@ -2161,7 +2250,7 @@
 
     sc->u.data.buffer = buffer;
     sc->u.data.buff_len = *buffer_len;
-    rc=zoo_aget(zh, path, watch, SYNCHRONOUS_MARKER, sc);
+    rc=zoo_awget(zh, path, watcher, watcherCtx, SYNCHRONOUS_MARKER, sc);
     if(rc==ZOK){
         wait_sync_completion(sc);
         rc = sc->rc;
@@ -2195,12 +2284,19 @@
 int zoo_get_children(zhandle_t *zh, const char *path, int watch,
         struct String_vector *strings)
 {
+    return zoo_wget_children(zh,path,watch?zh->watcher:0,zh->context,strings);
+}
+
+int zoo_wget_children(zhandle_t *zh, const char *path, 
+        watcher_fn watcher, void* watcherCtx,
+        struct String_vector *strings)
+{
     struct sync_completion *sc = alloc_sync_completion();
     int rc;
     if (!sc) {
         return ZSYSTEMERROR;
     }
-    rc=zoo_aget_children(zh, path, watch, SYNCHRONOUS_MARKER, sc);
+    rc=zoo_awget_children(zh, path, watcher, watcherCtx, SYNCHRONOUS_MARKER, sc);
     if(rc==ZOK){
         wait_sync_completion(sc);
         rc = sc->rc;

Added: hadoop/zookeeper/trunk/src/c/tests/CollectionUtil.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/CollectionUtil.h?rev=679557&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/c/tests/CollectionUtil.h (added)
+++ hadoop/zookeeper/trunk/src/c/tests/CollectionUtil.h Thu Jul 24 14:46:30 2008
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _COLLECTION_UTIL_H_
+#define _COLLECTION_UTIL_H_
+
+/**
+ * \file
+ * CollectionBuilder and DictionaryBuilder classes and collection utility functions
+ */
+
+namespace Util 
+{
+
+// *********************************************************
+/** A shortcut to use for building collections.
+ * This class is a wrapper around standard STL collection containers such as vector.
+ * It allows one to conveniently build collections at the variable initialization time:
+ * \code
+ * #include "CollectionUtil.h"
+ * #include "Vector.h"  // for ostream << operator overload for STL vector
+ * using Util;
+ * 
+ * int main()
+ * {
+ *   typedef vector<string> MyVector;
+ *   MyVector myVector=CollectionBuilder<MyVector>()("str1")("str2")("str3");
+ *   cout<<myVector;
+ *   // the following output will be produced:
+ *   // [str1,str2,str3]
+ * }
+ * \endcode
+ */
+template <class CONT>
+class CollectionBuilder
+{
+public:
+  /// Type of the collection container. 
+  typedef CONT CollectionType;
+  /// Container's value type.
+  typedef typename CollectionType::value_type value_type;
+  /// Container's constant iterator type.
+  typedef typename CollectionType::const_iterator const_iterator;
+  /// Container's size type.
+  typedef typename CollectionType::size_type size_type;
+
+  /** Operator function call overload to allow call chaining.
+   * \param value the value to be inserted into the container
+   */
+  CollectionBuilder<CONT>& operator()(const value_type& value){
+    return push_back(value);
+  }
+  /** Same as regular STL push_back() but allows call chaining.
+   * \param value the value to be inserted into the container
+   */
+  CollectionBuilder<CONT>& push_back(const value_type& value){
+    collection_.push_back(value);
+    return *this;
+  }
+  /// \name Standard STL container interface
+  /// @{
+  const_iterator begin() const{return collection_.begin();}
+  const_iterator end() const{return collection_.end();}
+  size_type size() const{return collection_.size();}
+  void clear() {collection_.clear();}
+  ///@}
+  /// Explicit typecast operator.
+  operator const CollectionType&() const {return collection_;}
+private:
+  /// \cond PRIVATE
+  CollectionType collection_;
+  /// \endcond
+};
+
+
+// *********************************************************
+/** A shortcut to use for building dictionaries.
+ * This class is a wrapper around standard STL associative containers such as map.
+ * It allows one to conveniently build dictionaries at the variable initialization time:
+ * \code
+ * #include "CollectionUtil.h"
+ * #include "Map.h"  // for ostream << operator overload for STL map
+ * using Util;
+ * 
+ * int main()
+ * {
+ *   typedef map<string,int> MyMap;
+ *   MyMap myMap=DictionaryBuilder<MyMap>()("str1",1)("str2",2)("str3",3);
+ *   cout<<myMap;
+ *   // the following output will be produced:
+ *   // [str1=1,str2=2,str3=3]
+ * }
+ * \endcode
+ */
+template <class CONT>
+class DictionaryBuilder
+{
+public:
+  /// The type of the associative container
+  typedef CONT DictionaryType;
+  /// Container's element type (usually a pair<key_type,mapped_type>)
+  typedef typename DictionaryType::value_type value_type;
+  /// Container's key type
+  typedef typename DictionaryType::key_type key_type;
+  /// Container's value type 
+  typedef typename DictionaryType::mapped_type mapped_type;
+  /// Container's constant iterator type 
+  typedef typename DictionaryType::const_iterator const_iterator;
+  /// Container's writable iterator type   
+  typedef typename DictionaryType::iterator iterator;
+  /// Container's size type
+  typedef typename DictionaryType::size_type size_type;
+ 
+  /** Operator function call overload to allow call chaining.
+   * \param key the value key to be inserted
+   * \param value the value to be inserted into the container
+   * \return a non-const reference to self
+   */
+  DictionaryBuilder<CONT>& operator()(const key_type& key,const mapped_type& value){
+    dict_.insert(value_type(key,value));
+    return *this;
+  }
+  /** Lookup value by key.
+   * \param key the key associated with the value.
+   * \return a non-const iterator pointing to the element whose key matched the \a key parameter
+   */
+  iterator find(const key_type& key){
+    return dict_.find(key);
+  }
+  /** Lookup value by key.
+   * \param key the key associated with the value.
+   * \return a const iterator pointing to the element whose key matched the \a key parameter
+   */
+  const_iterator find(const key_type& key) const{
+    return dict_.find(key);
+  }
+
+  /// \name Standard STL container interface
+  /// @{
+  const_iterator begin() const{return dict_.begin();}
+  const_iterator end() const{return dict_.end();}
+  size_type size() const{return dict_.size();}
+  void clear() {dict_.clear();}
+  ///@}
+  /// Explicit typecast operator.
+  operator const DictionaryType&() const {return dict_;}
+private:
+  DictionaryType dict_;
+};
+
+
+// ***********************************************************
+/** Deletes all dynamically allocated elements of a collection.
+ * C::value_type is expected to be a pointer to a dynamically allocated object, or it won't compile.
+ * The function will iterate over all container elements and call delete for each of them.
+ * \param c a collection (vector,set) whose elements are being deleted.
+ */
+template <class C>
+void clearCollection(C& c){
+  for(typename C::const_iterator it=c.begin();it!=c.end();++it)
+    delete *it;
+  c.clear();
+}
+
+/** Deletes all dynamically allocated values of the assotiative container.
+ * The function expects the M::value_type to be a pair<..., ptr_to_type>, or it won't compile.
+ * It first deletes the objects pointed to by ptr_to_type
+ * and then clears (calls m.clear()) the container.
+ * \param m an associative container (map,hash_map) whose elements are being deleted.
+ */
+template <class M>
+void clearMap(M& m){
+  for(typename M::const_iterator it=m.begin();it!=m.end();++it)
+    delete it->second;
+  m.clear();
+}
+
+} // namespace Util
+
+
+#endif // _COLLECTION_UTIL_H_

Propchange: hadoop/zookeeper/trunk/src/c/tests/CollectionUtil.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/zookeeper/trunk/src/c/tests/TestHashtable.cc
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/TestHashtable.cc?rev=679557&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/c/tests/TestHashtable.cc (added)
+++ hadoop/zookeeper/trunk/src/c/tests/TestHashtable.cc Thu Jul 24 14:46:30 2008
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cppunit/extensions/HelperMacros.h>
+#include "CppAssertHelper.h"
+
+
+#include "CollectionUtil.h"
+using namespace Util;
+
+#include "Vector.h"
+using namespace std;
+
+#include "src/zk_hashtable.h"
+
+class Zookeeper_hashtable : public CPPUNIT_NS::TestFixture
+{
+    CPPUNIT_TEST_SUITE(Zookeeper_hashtable);
+    CPPUNIT_TEST(testInsertElement1);
+    CPPUNIT_TEST(testInsertElement2);
+    CPPUNIT_TEST(testInsertElement3);
+    CPPUNIT_TEST(testContainsWatcher1);
+    CPPUNIT_TEST(testContainsWatcher2);
+    CPPUNIT_TEST(testCombineHashtable1);
+    CPPUNIT_TEST(testMoveMergeWatchers1);
+    CPPUNIT_TEST(testDeliverSessionEvent1);
+    CPPUNIT_TEST(testDeliverZnodeEvent1);
+    CPPUNIT_TEST_SUITE_END();
+
+    static void watcher(zhandle_t *, int, int, const char *,void*){}
+    zk_hashtable *ht;
+    
+public:
+
+    void setUp()
+    {
+        ht=create_zk_hashtable();
+    }
+    
+    void tearDown()
+    {
+        destroy_zk_hashtable(ht);
+    }
+
+    static vector<int> getWatcherCtxAsVector(zk_hashtable* ht,const char* path){
+        watcher_object_t* wo=getFirstWatcher(ht,path);
+        vector<int> res;
+        while(wo!=0){
+            res.push_back((int)wo->context);
+            wo=wo->next;
+        }
+        return res;
+    }
+    
+    // insert 2 watchers for different paths
+    // verify that hashtable size is 2
+    void testInsertElement1()
+    {
+        CPPUNIT_ASSERT_EQUAL(0,get_element_count(ht));
+        int res=insert_watcher_object(ht,"path1",
+                create_watcher_object(watcher,(void*)1));
+        CPPUNIT_ASSERT_EQUAL(1,res);
+        res=insert_watcher_object(ht,"path2",
+                create_watcher_object(watcher,(void*)1));
+        CPPUNIT_ASSERT_EQUAL(1,res);
+        CPPUNIT_ASSERT_EQUAL(2,get_element_count(ht));
+        vector<int> expWatchers=CollectionBuilder<vector<int> >().push_back(1);
+        CPPUNIT_ASSERT_EQUAL(expWatchers,getWatcherCtxAsVector(ht,"path1"));
+        CPPUNIT_ASSERT_EQUAL(expWatchers,getWatcherCtxAsVector(ht,"path2"));
+        clean_zk_hashtable(ht);
+        CPPUNIT_ASSERT_EQUAL(0,get_element_count(ht));        
+    }
+    
+    // insert 2 different watchers for the same path;
+    // verify: hashtable element count is 1, and the watcher count for the path
+    // is 2
+    void testInsertElement2()
+    {
+        int res=insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)1));
+        CPPUNIT_ASSERT_EQUAL(1,res);
+        res=insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)2));
+        CPPUNIT_ASSERT_EQUAL(1,res);
+        CPPUNIT_ASSERT_EQUAL(1,get_element_count(ht));
+        CPPUNIT_ASSERT_EQUAL(2,get_watcher_count(ht,"path1"));
+        vector<int> expWatchers=CollectionBuilder<vector<int> >().
+            push_back(2).push_back(1);
+        CPPUNIT_ASSERT_EQUAL(expWatchers,getWatcherCtxAsVector(ht,"path1"));
+    }
+
+    // insert 2 identical watchers for the same path;
+    // verify: hashtable element count is 1, the watcher count for the path is 1
+    void testInsertElement3()
+    {
+        watcher_object_t wobject;
+        wobject.watcher=watcher;
+        wobject.context=(void*)1;
+        
+        int res=insert_watcher_object(ht,"path1",clone_watcher_object(&wobject));
+        CPPUNIT_ASSERT_EQUAL(1,res);
+        watcher_object_t* wo=clone_watcher_object(&wobject);
+        res=insert_watcher_object(ht,"path1",wo);
+        CPPUNIT_ASSERT_EQUAL(0,res);
+        CPPUNIT_ASSERT_EQUAL(1,get_element_count(ht));
+        CPPUNIT_ASSERT_EQUAL(1,get_watcher_count(ht,"path1"));
+        vector<int> expWatchers=CollectionBuilder<vector<int> >().push_back(1);
+        CPPUNIT_ASSERT_EQUAL(expWatchers,getWatcherCtxAsVector(ht,"path1"));
+        // must delete the object that wasn't inserted!
+        free(wo);
+    }
+
+    // verify: the watcher is found in the table
+    void testContainsWatcher1()
+    {
+        watcher_object_t expected;
+        expected.watcher=watcher;
+        expected.context=(void*)1;
+        
+        insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)2));
+        insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)3));
+        insert_watcher_object(ht,"path2",create_watcher_object(watcher,(void*)4));
+        insert_watcher_object(ht,"path2",clone_watcher_object(&expected));
+        
+        CPPUNIT_ASSERT_EQUAL(2,get_element_count(ht));
+        CPPUNIT_ASSERT_EQUAL(2,get_watcher_count(ht,"path1"));
+        CPPUNIT_ASSERT_EQUAL(2,get_watcher_count(ht,"path2"));
+
+        int res=contains_watcher(ht,&expected);
+        CPPUNIT_ASSERT(res==1);
+    }
+
+    // verify: the watcher is not found
+    void testContainsWatcher2()
+    {
+        watcher_object_t expected;
+        expected.watcher=watcher;
+        expected.context=(void*)1;
+        
+        insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)2));
+        insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)3));
+        insert_watcher_object(ht,"path2",create_watcher_object(watcher,(void*)4));
+        insert_watcher_object(ht,"path2",create_watcher_object(watcher,(void*)5));
+        
+        CPPUNIT_ASSERT_EQUAL(2,get_element_count(ht));
+        CPPUNIT_ASSERT_EQUAL(2,get_watcher_count(ht,"path1"));
+        CPPUNIT_ASSERT_EQUAL(2,get_watcher_count(ht,"path2"));
+
+        int res=contains_watcher(ht,&expected);
+        CPPUNIT_ASSERT(res==0);
+    }
+
+    void testCombineHashtable1()
+    {
+        insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)2));
+        insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)3));
+        insert_watcher_object(ht,"path2",create_watcher_object(watcher,(void*)4));
+        insert_watcher_object(ht,"path2",create_watcher_object(watcher,(void*)5));
+        
+        zk_hashtable* ht2=create_zk_hashtable();
+
+        insert_watcher_object(ht2,"path1",create_watcher_object(watcher,(void*)2));
+        insert_watcher_object(ht2,"path2",create_watcher_object(watcher,(void*)6));
+        insert_watcher_object(ht2,"path3",create_watcher_object(watcher,(void*)2));
+
+        zk_hashtable* res=combine_hashtables(ht,ht2);
+        
+        CPPUNIT_ASSERT_EQUAL(3,get_element_count(res));
+        // path1 --> 2,3
+        CPPUNIT_ASSERT_EQUAL(2,get_watcher_count(res,"path1"));
+        vector<int> expWatchers1=CollectionBuilder<vector<int> >().
+            push_back(2).push_back(3);
+        CPPUNIT_ASSERT_EQUAL(expWatchers1,getWatcherCtxAsVector(res,"path1"));
+        // path2 --> 4,5,6
+        CPPUNIT_ASSERT_EQUAL(3,get_watcher_count(res,"path2"));
+        vector<int> expWatchers2=CollectionBuilder<vector<int> >().
+            push_back(6).push_back(4).push_back(5);
+        CPPUNIT_ASSERT_EQUAL(expWatchers2,getWatcherCtxAsVector(res,"path2"));
+        // path3 --> 2
+        CPPUNIT_ASSERT_EQUAL(1,get_watcher_count(res,"path3"));
+        vector<int> expWatchers3=CollectionBuilder<vector<int> >().push_back(2);
+        CPPUNIT_ASSERT_EQUAL(expWatchers3,getWatcherCtxAsVector(res,"path3"));
+
+        destroy_zk_hashtable(ht2);
+        destroy_zk_hashtable(res);
+    }
+    
+    void testMoveMergeWatchers1()
+    {
+        insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)2));
+        insert_watcher_object(ht,"path1",create_watcher_object(watcher,(void*)3));
+        insert_watcher_object(ht,"path2",create_watcher_object(watcher,(void*)4));
+        insert_watcher_object(ht,"path2",create_watcher_object(watcher,(void*)5));
+        
+        zk_hashtable* ht2=create_zk_hashtable();
+
+        insert_watcher_object(ht2,"path1",create_watcher_object(watcher,(void*)2));
+        insert_watcher_object(ht2,"path1",create_watcher_object(watcher,(void*)6));
+
+        zk_hashtable* res=move_merge_watchers(ht,ht2,"path1");
+        
+        CPPUNIT_ASSERT_EQUAL(1,get_element_count(res));
+        CPPUNIT_ASSERT_EQUAL(3,get_watcher_count(res,"path1"));
+        vector<int> expWatchers=CollectionBuilder<vector<int> >().
+            push_back(6).push_back(2).push_back(3);
+        CPPUNIT_ASSERT_EQUAL(expWatchers,getWatcherCtxAsVector(res,"path1"));
+
+        // make sure the path entry has been deleted from the source hashtables
+        CPPUNIT_ASSERT_EQUAL(1,get_element_count(ht));
+        CPPUNIT_ASSERT_EQUAL(2,get_watcher_count(ht,"path2"));
+        CPPUNIT_ASSERT_EQUAL(0,get_element_count(ht2));
+        
+        destroy_zk_hashtable(ht2);
+        destroy_zk_hashtable(res);
+    }
+
+    static void iterWatcher(zhandle_t *zh, int type, int state, 
+            const char* path,void* ctx){
+        vector<int>* res=reinterpret_cast<vector<int>*>(zh);
+        res->push_back((int)ctx);
+    }
+    
+    void testDeliverSessionEvent1(){
+        insert_watcher_object(ht,"path1",create_watcher_object(iterWatcher,(void*)2));
+        insert_watcher_object(ht,"path2",create_watcher_object(iterWatcher,(void*)3));
+        insert_watcher_object(ht,"path2",create_watcher_object(iterWatcher,(void*)4));
+        insert_watcher_object(ht,"path3",create_watcher_object(iterWatcher,(void*)5));
+        
+        vector<int> res;
+        deliver_session_event(ht,(zhandle_t*)&res,10,20);
+        vector<int> expWatchers=CollectionBuilder<vector<int> >().
+            push_back(4).push_back(3).push_back(5).push_back(2);
+        CPPUNIT_ASSERT_EQUAL(expWatchers,res);
+    }
+    
+    void testDeliverZnodeEvent1(){
+        insert_watcher_object(ht,"path2",create_watcher_object(iterWatcher,(void*)3));
+        insert_watcher_object(ht,"path2",create_watcher_object(iterWatcher,(void*)4));
+        
+        vector<int> res;
+        deliver_znode_event(ht,(zhandle_t*)&res,"path2",10,20);
+        vector<int> expWatchers=CollectionBuilder<vector<int> >().
+            push_back(4).push_back(3);
+        CPPUNIT_ASSERT_EQUAL(expWatchers,res);
+        expWatchers.clear();
+        res.clear();
+        // non-existent path
+        deliver_znode_event(ht,(zhandle_t*)&res,"path100",10,20);
+        CPPUNIT_ASSERT_EQUAL(expWatchers,res);
+        // make sure the path entry has been deleted from the source hashtable
+        CPPUNIT_ASSERT_EQUAL(0,get_element_count(ht));
+    }
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_hashtable);

Propchange: hadoop/zookeeper/trunk/src/c/tests/TestHashtable.cc
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/zookeeper/trunk/src/c/tests/TestOperations.cc
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/TestOperations.cc?rev=679557&r1=679556&r2=679557&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/c/tests/TestOperations.cc (original)
+++ hadoop/zookeeper/trunk/src/c/tests/TestOperations.cc Thu Jul 24 14:46:30 2008
@@ -41,7 +41,7 @@
     CPPUNIT_TEST_SUITE_END();
     zhandle_t *zh;
 
-    static void watcher(zhandle_t *, int, int, const char *){}
+    static void watcher(zhandle_t *, int, int, const char *,void*){}
 public: 
     void setUp()
     {
@@ -256,7 +256,7 @@
         zkServer.addRecvResponse(new PingResponse);
         rc=zookeeper_interest(zh,&fd,&interest,&tv);
         CPPUNIT_ASSERT_EQUAL(ZOK,rc);
-        // sleep for a short while (10 ms)
+        // pseudo-sleep for a short while (10 ms)
         timeMock.millitick(10);
         rc=zookeeper_process(zh,interest);
         CPPUNIT_ASSERT_EQUAL(ZOK,rc);
@@ -575,12 +575,26 @@
             changed_=true;
             if(path!=0) path_=path;
         }
+        // this predicate checks if CHANGE_EVENT event type was triggered, unlike
+        // the isWatcherTriggered() that returns true whenever a watcher is triggered
+        // regardless of the event type
         SyncedBoolCondition isNodeChangedTriggered() const{
             return SyncedBoolCondition(changed_,mx_);
         }
         bool changed_;
         string path_;
     };
+    
+    class AsyncWatcherCompletion: public AsyncCompletion{
+    public:
+        AsyncWatcherCompletion(ZookeeperServer& zkServer):zkServer_(zkServer){}
+        virtual void statCompl(int rc, const Stat *stat){
+            // we received a server response, now enqueue a watcher event
+            // to trigger the watcher
+            zkServer_.addRecvResponse(new ZNodeEvent(CHANGED_EVENT,"/x/y/z"));
+        }
+        ZookeeperServer& zkServer_;
+    };
     // verify that async watcher is called for znode events (CREATED, DELETED etc.)
     void testAsyncWatcher1(){
         Mock_gettimeofday timeMock;
@@ -596,9 +610,14 @@
         CPPUNIT_ASSERT(zh!=0);
         // make sure the client has connected
         CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
-
-        // trigger the watcher
-        zkServer.addRecvResponse(new ZNodeEvent(CHANGED_EVENT,"/x/y/z"));
+        
+        // set the watcher
+        AsyncWatcherCompletion completion(zkServer);
+        // prepare a response for the zoo_aexists() request
+        zkServer.addOperationResponse(new ZooStatResponse);
+        int rc=zoo_aexists(zh,"/x/y/z",1,asyncCompletion,&completion);
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+        
         CPPUNIT_ASSERT(ensureCondition(action.isNodeChangedTriggered(),1000)<1000);
         CPPUNIT_ASSERT_EQUAL(string("/x/y/z"),action.path_);                
     }

Added: hadoop/zookeeper/trunk/src/c/tests/TestWatchers.cc
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/TestWatchers.cc?rev=679557&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/c/tests/TestWatchers.cc (added)
+++ hadoop/zookeeper/trunk/src/c/tests/TestWatchers.cc Thu Jul 24 14:46:30 2008
@@ -0,0 +1,745 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cppunit/extensions/HelperMacros.h>
+#include "CppAssertHelper.h"
+
+#include "ZKMocks.h"
+#include "CollectionUtil.h"
+
+class Zookeeper_watchers : public CPPUNIT_NS::TestFixture
+{
+    CPPUNIT_TEST_SUITE(Zookeeper_watchers);
+    CPPUNIT_TEST(testDefaultSessionWatcher1);
+    CPPUNIT_TEST(testDefaultSessionWatcher2);
+    CPPUNIT_TEST(testObjectSessionWatcher1);
+    CPPUNIT_TEST(testObjectSessionWatcher2);
+    CPPUNIT_TEST(testNodeWatcher1);
+    CPPUNIT_TEST(testChildWatcher1);
+    CPPUNIT_TEST(testChildWatcher2);
+    CPPUNIT_TEST_SUITE_END();
+
+    static void watcher(zhandle_t *, int, int, const char *,void*){}
+    zhandle_t *zh;
+    
+public:
+
+    void setUp()
+    {
+        zoo_set_debug_level((ZooLogLevel)0); // disable logging
+        zoo_deterministic_conn_order(0);
+        zh=0;
+    }
+    
+    void tearDown()
+    {
+        zookeeper_close(zh);
+    }
+    
+    class ConnectionWatcher: public WatcherAction{
+    public:
+        ConnectionWatcher():connected_(false),counter_(0){}
+        virtual void onConnectionEstablished(zhandle_t*){
+            synchronized(mx_);
+            counter_++;
+            connected_=true;
+        }
+        SyncedBoolCondition isConnectionEstablished() const{
+            return SyncedBoolCondition(connected_,mx_);
+        }
+        bool connected_;
+        int counter_;
+    };
+
+    class DisconnectWatcher: public WatcherAction{
+    public:
+        DisconnectWatcher():disconnected_(false),counter_(0){}
+        virtual void onConnectionLost(zhandle_t*){
+            synchronized(mx_);
+            counter_++;
+            disconnected_=true;
+        }
+        SyncedBoolCondition isDisconnected() const{
+            return SyncedBoolCondition(disconnected_,mx_);
+        }
+        bool disconnected_;
+        int counter_;
+    };
+
+    class CountingDataWatcher: public WatcherAction{
+    public:
+        CountingDataWatcher():disconnected_(false),counter_(0){}
+        virtual void onNodeValueChanged(zhandle_t*,const char* path){
+            synchronized(mx_);
+            counter_++;
+        }
+        virtual void onConnectionLost(zhandle_t*){
+            synchronized(mx_);
+            counter_++;
+            disconnected_=true;
+        }
+        bool disconnected_;
+        int counter_;
+    };
+
+    class DeletionCountingDataWatcher: public WatcherAction{
+    public:
+        DeletionCountingDataWatcher():counter_(0){}
+        virtual void onNodeDeleted(zhandle_t*,const char* path){
+            synchronized(mx_);
+            counter_++;
+        }
+        int counter_;
+    };
+
+    class ChildEventCountingWatcher: public WatcherAction{
+    public:
+        ChildEventCountingWatcher():counter_(0){}
+        virtual void onChildChanged(zhandle_t*,const char* path){
+            synchronized(mx_);
+            counter_++;
+        }
+        int counter_;
+    };
+
+#ifndef THREADED
+    
+    // verify: the default watcher is called once for a session event
+    void testDefaultSessionWatcher1(){
+        Mock_gettimeofday timeMock;
+        ZookeeperServer zkServer;
+        // must call zookeeper_close() while all the mocks are in scope
+        CloseFinally guard(&zh);
+
+        ConnectionWatcher watcher;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &watcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        
+        int fd=0;
+        int interest=0;
+        timeval tv;
+        // open the socket
+        int rc=zookeeper_interest(zh,&fd,&interest,&tv);        
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);        
+        CPPUNIT_ASSERT_EQUAL(CONNECTING_STATE,zoo_state(zh));
+        // send the handshake packet to the server
+        rc=zookeeper_process(zh,interest);
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);        
+        CPPUNIT_ASSERT_EQUAL(ASSOCIATING_STATE,zoo_state(zh));
+        // receive the server handshake response
+        rc=zookeeper_process(zh,interest);
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+        // verify connected
+        CPPUNIT_ASSERT_EQUAL(CONNECTED_STATE,zoo_state(zh));
+        CPPUNIT_ASSERT(watcher.connected_);
+        CPPUNIT_ASSERT_EQUAL(1,watcher.counter_);
+    }
+    
+    // test case: connect to server, set a default watcher, disconnect from the server
+    // verify: the default watcher is called once
+    void testDefaultSessionWatcher2(){
+        Mock_gettimeofday timeMock;
+        ZookeeperServer zkServer;
+        // must call zookeeper_close() while all the mocks are in scope
+        CloseFinally guard(&zh);
+
+        DisconnectWatcher watcher;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &watcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // simulate connected state
+        forceConnected(zh);
+        
+        // first operation
+        AsyncCompletion ignored;
+        zkServer.addOperationResponse(new ZooGetResponse("1",1));
+        int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+        // this will process the response and activate the watcher
+        rc=zookeeper_process(zh,ZOOKEEPER_READ);
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+
+        // now, disconnect
+        zkServer.setConnectionLost();
+        rc=zookeeper_process(zh,ZOOKEEPER_READ);
+        CPPUNIT_ASSERT_EQUAL(ZCONNECTIONLOSS,rc);
+        // verify disconnected
+        CPPUNIT_ASSERT(watcher.disconnected_);
+        CPPUNIT_ASSERT_EQUAL(1,watcher.counter_);
+    }
+    
+    // testcase: connect to the server, set a watcher object on a node, 
+    //           disconnect from the server
+    // verify: the watcher object as well as the default watcher are called
+    void testObjectSessionWatcher1(){
+        Mock_gettimeofday timeMock;
+        ZookeeperServer zkServer;
+        // must call zookeeper_close() while all the mocks are in scope
+        CloseFinally guard(&zh);
+
+        DisconnectWatcher defWatcher;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &defWatcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // simulate connected state
+        forceConnected(zh);
+        
+        AsyncCompletion ignored;
+        CountingDataWatcher wobject;
+        zkServer.addOperationResponse(new ZooStatResponse);
+        int rc=zoo_awexists(zh,"/x/y/1",activeWatcher,&wobject,
+                asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+        // this will process the response and activate the watcher
+        rc=zookeeper_process(zh,ZOOKEEPER_READ);
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+
+        // now, disconnect
+        zkServer.setConnectionLost();
+        rc=zookeeper_process(zh,ZOOKEEPER_READ);
+        CPPUNIT_ASSERT_EQUAL(ZCONNECTIONLOSS,rc);
+
+        // verify the default watcher has been triggered
+        CPPUNIT_ASSERT(defWatcher.disconnected_);
+        // and triggered only once
+        CPPUNIT_ASSERT_EQUAL(1,defWatcher.counter_);
+        
+        // the path-specific watcher has been triggered as well
+        CPPUNIT_ASSERT(wobject.disconnected_);
+        // only once!
+        CPPUNIT_ASSERT_EQUAL(1,wobject.counter_);
+    }
+
+    // testcase: connect to the server, set a watcher object on a node, 
+    //           set a def watcher on another node,disconnect from the server
+    // verify: the watcher object as well as the default watcher are called
+    void testObjectSessionWatcher2(){
+        Mock_gettimeofday timeMock;
+        ZookeeperServer zkServer;
+        // must call zookeeper_close() while all the mocks are in scope
+        CloseFinally guard(&zh);
+
+        DisconnectWatcher defWatcher;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &defWatcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // simulate connected state
+        forceConnected(zh);
+        
+        // set the default watcher
+        AsyncCompletion ignored;
+        zkServer.addOperationResponse(new ZooStatResponse);
+        int rc=zoo_aexists(zh,"/a/b/c",1,asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+
+        CountingDataWatcher wobject;
+        zkServer.addOperationResponse(new ZooStatResponse);
+        rc=zoo_awexists(zh,"/x/y/z",activeWatcher,&wobject,
+                asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+        
+        // this will process the response and activate the watcher
+        while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK);
+        CPPUNIT_ASSERT_EQUAL(ZNOTHING,rc);
+
+        // disconnect now
+        zkServer.setConnectionLost();
+        rc=zookeeper_process(zh,ZOOKEEPER_READ);
+        CPPUNIT_ASSERT_EQUAL(ZCONNECTIONLOSS,rc);
+
+        // verify the default watcher has been triggered
+        CPPUNIT_ASSERT(defWatcher.disconnected_);
+        // and triggered only once
+        CPPUNIT_ASSERT_EQUAL(1,defWatcher.counter_);
+        
+        // the path-specific watcher has been triggered as well
+        CPPUNIT_ASSERT(wobject.disconnected_);
+        // only once!
+        CPPUNIT_ASSERT_EQUAL(1,wobject.counter_);
+    }
+
+    // testcase: register 2 node watches for different paths, trigger the watches
+    // verify: the data watchers are processed, the default watcher is not called
+    void testNodeWatcher1(){
+        Mock_gettimeofday timeMock;
+        ZookeeperServer zkServer;
+        // must call zookeeper_close() while all the mocks are in scope
+        CloseFinally guard(&zh);
+
+        DisconnectWatcher defWatcher;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &defWatcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // simulate connected state
+        forceConnected(zh);
+        
+        AsyncCompletion ignored;
+        CountingDataWatcher wobject1;
+        zkServer.addOperationResponse(new ZooStatResponse);
+        int rc=zoo_awexists(zh,"/a/b/c",activeWatcher,&wobject1,
+                asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+
+        CountingDataWatcher wobject2;
+        zkServer.addOperationResponse(new ZooStatResponse);
+        rc=zoo_awexists(zh,"/x/y/z",activeWatcher,&wobject2,
+                asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+        
+        // this will process the response and activate the watcher
+        while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK);
+        CPPUNIT_ASSERT_EQUAL(ZNOTHING,rc);
+
+        // we are all set now; let's trigger the watches
+        zkServer.addRecvResponse(new ZNodeEvent(CHANGED_EVENT,"/a/b/c"));
+        zkServer.addRecvResponse(new ZNodeEvent(CHANGED_EVENT,"/x/y/z"));
+        // make sure all watchers have been processed
+        while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK);
+        CPPUNIT_ASSERT_EQUAL(ZNOTHING,rc);
+        
+        CPPUNIT_ASSERT_EQUAL(1,wobject1.counter_);
+        CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_);        
+        CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_);
+    }
+
+    // testcase: set up both a children and a data watchers on the node /a, then
+    //           delete the node by sending a DELETE_EVENT event
+    // verify: both watchers are triggered
+    void testChildWatcher1(){
+        Mock_gettimeofday timeMock;
+        ZookeeperServer zkServer;
+        // must call zookeeper_close() while all the mocks are in scope
+        CloseFinally guard(&zh);
+
+        DeletionCountingDataWatcher defWatcher;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &defWatcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // simulate connected state
+        forceConnected(zh);
+        
+        AsyncCompletion ignored;
+        DeletionCountingDataWatcher wobject1;
+        zkServer.addOperationResponse(new ZooStatResponse);
+        int rc=zoo_awexists(zh,"/a",activeWatcher,&wobject1,
+                asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+
+        typedef ZooGetChildrenResponse::StringVector ZooVector;
+        zkServer.addOperationResponse(new ZooGetChildrenResponse(
+                Util::CollectionBuilder<ZooVector>()("/a/1")("/a/2")
+                ));
+        DeletionCountingDataWatcher wobject2;
+        rc=zoo_awget_children(zh,"/a",activeWatcher,
+                &wobject2,asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+        
+        // this will process the response and activate the watcher
+        while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK);
+        CPPUNIT_ASSERT_EQUAL(ZNOTHING,rc);
+
+        // we are all set now; let's trigger the watches
+        zkServer.addRecvResponse(new ZNodeEvent(DELETED_EVENT,"/a"));
+        // make sure the watchers have been processed
+        while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK);
+        CPPUNIT_ASSERT_EQUAL(ZNOTHING,rc);
+
+        CPPUNIT_ASSERT_EQUAL(1,wobject1.counter_);
+        CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_);        
+        CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_);
+    }
+
+    // testcase: create both a child and data watch on the node /a, send a CHILD_EVENT
+    // verify: only the child watch triggered
+    void testChildWatcher2(){
+        Mock_gettimeofday timeMock;
+        ZookeeperServer zkServer;
+        // must call zookeeper_close() while all the mocks are in scope
+        CloseFinally guard(&zh);
+
+        ChildEventCountingWatcher defWatcher;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &defWatcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // simulate connected state
+        forceConnected(zh);
+        
+        AsyncCompletion ignored;
+        ChildEventCountingWatcher wobject1;
+        zkServer.addOperationResponse(new ZooStatResponse);
+        int rc=zoo_awexists(zh,"/a",activeWatcher,&wobject1,
+                asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+
+        typedef ZooGetChildrenResponse::StringVector ZooVector;
+        zkServer.addOperationResponse(new ZooGetChildrenResponse(
+                Util::CollectionBuilder<ZooVector>()("/a/1")("/a/2")
+                ));
+        ChildEventCountingWatcher wobject2;
+        rc=zoo_awget_children(zh,"/a",activeWatcher,
+                &wobject2,asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+        
+        // this will process the response and activate the watcher
+        while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK);
+        CPPUNIT_ASSERT_EQUAL(ZNOTHING,rc);
+
+        // we are all set now; let's trigger the watches
+        zkServer.addRecvResponse(new ZNodeEvent(CHILD_EVENT,"/a"));
+        // make sure the watchers have been processed
+        while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK);
+        CPPUNIT_ASSERT_EQUAL(ZNOTHING,rc);
+
+        CPPUNIT_ASSERT_EQUAL(0,wobject1.counter_);
+        CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_);        
+        CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_);
+    }
+
+#else
+    // verify: the default watcher is called once for a session event
+    void testDefaultSessionWatcher1(){
+        Mock_gettimeofday timeMock;
+        // zookeeper simulator
+        ZookeeperServer zkServer;
+        // detects when all watchers have been delivered
+        WatcherDeliveryTracker deliveryTracker(SESSION_EVENT,CONNECTED_STATE);
+        Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+        // must call zookeeper_close() while all the mocks are in the scope!
+        CloseFinally guard(&zh);
+        
+        ConnectionWatcher watcher;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &watcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // wait till watcher proccessing has completed (the connection 
+        // established event)
+        CPPUNIT_ASSERT(ensureCondition(
+                deliveryTracker.isWatcherProcessingCompleted(),1000)<1000);
+        
+        // verify the watcher has been triggered
+        CPPUNIT_ASSERT(ensureCondition(watcher.isConnectionEstablished(),1000)<1000);
+        // triggered only once
+        CPPUNIT_ASSERT_EQUAL(1,watcher.counter_);
+    }
+
+    // test case: connect to server, set a default watcher, disconnect from the server
+    // verify: the default watcher is called once
+    void testDefaultSessionWatcher2(){
+        Mock_gettimeofday timeMock;
+        // zookeeper simulator
+        ZookeeperServer zkServer;
+        Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+        // must call zookeeper_close() while all the mocks are in the scope!
+        CloseFinally guard(&zh);
+        
+        // detects when all watchers have been delivered
+        WatcherDeliveryTracker deliveryTracker(SESSION_EVENT,CONNECTING_STATE);
+        DisconnectWatcher watcher;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &watcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // make sure the client has connected
+        CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
+        // set a default watch
+        AsyncCompletion ignored;
+        // a successful server response will activate the watcher 
+        zkServer.addOperationResponse(new ZooStatResponse);
+        int rc=zoo_aexists(zh,"/x/y/z",1,asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+
+        // now, initiate a disconnect
+        zkServer.setConnectionLost();
+        CPPUNIT_ASSERT(ensureCondition(
+                deliveryTracker.isWatcherProcessingCompleted(),1000)<1000);
+        
+        // verify the watcher has been triggered
+        CPPUNIT_ASSERT(watcher.disconnected_);
+        // triggered only once
+        CPPUNIT_ASSERT_EQUAL(1,watcher.counter_);
+    }
+
+    // testcase: connect to the server, set a watcher object on a node, 
+    //           disconnect from the server
+    // verify: the watcher object as well as the default watcher are called
+    void testObjectSessionWatcher1(){
+        Mock_gettimeofday timeMock;
+        // zookeeper simulator
+        ZookeeperServer zkServer;
+        Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+        // must call zookeeper_close() while all the mocks are in the scope!
+        CloseFinally guard(&zh);
+        
+        // detects when all watchers have been delivered
+        WatcherDeliveryTracker deliveryTracker(SESSION_EVENT,CONNECTING_STATE);
+        DisconnectWatcher defWatcher;
+        // use the tracker to find out when the watcher has been activated
+        WatcherActivationTracker activationTracker;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &defWatcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // make sure the client has connected
+        CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
+
+        AsyncCompletion ignored;
+        // this successful server response will activate the watcher 
+        zkServer.addOperationResponse(new ZooStatResponse);
+        CountingDataWatcher wobject;
+        activationTracker.track(&wobject);
+        // set a path-specific watcher
+        int rc=zoo_awexists(zh,"/x/y/z",activeWatcher,&wobject,
+                asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+        // make sure the watcher gets activated before we continue
+        CPPUNIT_ASSERT(ensureCondition(activationTracker.isWatcherActivated(),1000)<1000);
+        
+        // now, initiate a disconnect
+        zkServer.setConnectionLost();
+        // make sure all watchers have been processed
+        CPPUNIT_ASSERT(ensureCondition(
+                deliveryTracker.isWatcherProcessingCompleted(),1000)<1000);
+        
+        // verify the default watcher has been triggered
+        CPPUNIT_ASSERT(defWatcher.disconnected_);
+        // and triggered only once
+        CPPUNIT_ASSERT_EQUAL(1,defWatcher.counter_);
+        
+        // the path-specific watcher has been triggered as well
+        CPPUNIT_ASSERT(wobject.disconnected_);
+        // only once!
+        CPPUNIT_ASSERT_EQUAL(1,wobject.counter_);
+    }
+
+    // testcase: connect to the server, set a watcher object on a node, 
+    //           set a def watcher on another node,disconnect from the server
+    // verify: the watcher object as well as the default watcher are called
+    void testObjectSessionWatcher2(){
+        Mock_gettimeofday timeMock;
+        // zookeeper simulator
+        ZookeeperServer zkServer;
+        Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+        // must call zookeeper_close() while all the mocks are in the scope!
+        CloseFinally guard(&zh);
+        
+        // detects when all watchers have been delivered
+        WatcherDeliveryTracker deliveryTracker(SESSION_EVENT,CONNECTING_STATE);
+        DisconnectWatcher defWatcher;
+        // use the tracker to find out when the watcher has been activated
+        WatcherActivationTracker activationTracker;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &defWatcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // make sure the client has connected
+        CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
+
+        // set a default watch
+        AsyncCompletion ignored;
+        // a successful server response will activate the watcher 
+        zkServer.addOperationResponse(new ZooStatResponse);
+        activationTracker.track(&defWatcher);
+        int rc=zoo_aexists(zh,"/a/b/c",1,asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+        // make sure the watcher gets activated before we continue
+        CPPUNIT_ASSERT(ensureCondition(activationTracker.isWatcherActivated(),1000)<1000);
+
+        // this successful server response will activate the watcher 
+        zkServer.addOperationResponse(new ZooStatResponse);
+        CountingDataWatcher wobject;
+        activationTracker.track(&wobject);
+        // set a path-specific watcher
+        rc=zoo_awexists(zh,"/x/y/z",activeWatcher,&wobject,
+                asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+        // make sure the watcher gets activated before we continue
+        CPPUNIT_ASSERT(ensureCondition(activationTracker.isWatcherActivated(),1000)<1000);
+        
+        // now, initiate a disconnect
+        zkServer.setConnectionLost();
+        // make sure all watchers have been processed
+        CPPUNIT_ASSERT(ensureCondition(
+                deliveryTracker.isWatcherProcessingCompleted(),1000)<1000);
+        
+        // verify the default watcher has been triggered
+        CPPUNIT_ASSERT(defWatcher.disconnected_);
+        // and triggered only once
+        CPPUNIT_ASSERT_EQUAL(1,defWatcher.counter_);
+        
+        // the path-specific watcher has been triggered as well
+        CPPUNIT_ASSERT(wobject.disconnected_);
+        // only once!
+        CPPUNIT_ASSERT_EQUAL(1,wobject.counter_);
+    }
+
+    // testcase: register 2 node watches for different paths, trigger the watches
+    // verify: the data watchers are processed, the default watcher is not called
+    void testNodeWatcher1(){
+        Mock_gettimeofday timeMock;
+        // zookeeper simulator
+        ZookeeperServer zkServer;
+        Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+        // must call zookeeper_close() while all the mocks are in the scope!
+        CloseFinally guard(&zh);
+        
+        // detects when all watchers have been delivered
+        WatcherDeliveryTracker deliveryTracker(CHANGED_EVENT,0,false);
+        CountingDataWatcher defWatcher;
+        // use the tracker to find out when the watcher has been activated
+        WatcherActivationTracker activationTracker;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &defWatcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // make sure the client has connected
+        CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
+
+        // don't care about completions
+        AsyncCompletion ignored;
+        // set a one-shot watch
+        // a successful server response will activate the watcher 
+        zkServer.addOperationResponse(new ZooStatResponse);
+        CountingDataWatcher wobject1;
+        activationTracker.track(&wobject1);
+        int rc=zoo_awexists(zh,"/a/b/c",activeWatcher,&wobject1,
+                asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+        // make sure the watcher gets activated before we continue
+        CPPUNIT_ASSERT(ensureCondition(activationTracker.isWatcherActivated(),1000)<1000);
+
+        // this successful server response will activate the watcher 
+        zkServer.addOperationResponse(new ZooStatResponse);
+        CountingDataWatcher wobject2;
+        activationTracker.track(&wobject2);
+        // set a path-specific watcher
+        rc=zoo_awexists(zh,"/x/y/z",activeWatcher,&wobject2,
+                asyncCompletion,&ignored);
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+        // make sure the watcher gets activated before we continue
+        CPPUNIT_ASSERT(ensureCondition(activationTracker.isWatcherActivated(),1000)<1000);
+
+        // we are all set now; let's trigger the watches
+        zkServer.addRecvResponse(new ZNodeEvent(CHANGED_EVENT,"/a/b/c"));
+        zkServer.addRecvResponse(new ZNodeEvent(CHANGED_EVENT,"/x/y/z"));
+        // make sure all watchers have been processed
+        CPPUNIT_ASSERT(ensureCondition(
+                deliveryTracker.deliveryCounterEquals(2),1000)<1000);
+        
+        CPPUNIT_ASSERT_EQUAL(1,wobject1.counter_);
+        CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_);        
+        CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_);
+    }
+
+    // testcase: set up both a children and a data watchers on the node /a, then
+    //           delete the node (that is, send a DELETE_EVENT)
+    // verify: both watchers are triggered
+    void testChildWatcher1(){
+        Mock_gettimeofday timeMock;
+        // zookeeper simulator
+        ZookeeperServer zkServer;
+        Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+        // must call zookeeper_close() while all the mocks are in the scope!
+        CloseFinally guard(&zh);
+        
+        // detects when all watchers have been delivered
+        WatcherDeliveryTracker deliveryTracker(DELETED_EVENT,0);
+        DeletionCountingDataWatcher defWatcher;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &defWatcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // make sure the client has connected
+        CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
+        
+        // a successful server response will activate the watcher 
+        zkServer.addOperationResponse(new ZooStatResponse);
+        DeletionCountingDataWatcher wobject1;
+        Stat stat;
+        // add a node watch
+        int rc=zoo_wexists(zh,"/a",activeWatcher,&wobject1,&stat);
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+        
+        typedef ZooGetChildrenResponse::StringVector ZooVector;
+        zkServer.addOperationResponse(new ZooGetChildrenResponse(
+                Util::CollectionBuilder<ZooVector>()("/a/1")("/a/2")
+                ));
+        DeletionCountingDataWatcher wobject2;
+        String_vector children;
+        rc=zoo_wget_children(zh,"/a",activeWatcher,&wobject2,&children);
+        deallocate_String_vector(&children);
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+        
+        // we are all set now; let's trigger the watches
+        zkServer.addRecvResponse(new ZNodeEvent(DELETED_EVENT,"/a"));
+        // make sure the watchers have been processed
+        CPPUNIT_ASSERT(ensureCondition(
+                deliveryTracker.isWatcherProcessingCompleted(),1000)<1000);
+
+        CPPUNIT_ASSERT_EQUAL(1,wobject1.counter_);
+        CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_);        
+        CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_);
+    }
+    
+    // testcase: create both a child and data watch on the node /a, send a CHILD_EVENT
+    // verify: only the child watch triggered
+    void testChildWatcher2(){
+        Mock_gettimeofday timeMock;
+        // zookeeper simulator
+        ZookeeperServer zkServer;
+        Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
+        // must call zookeeper_close() while all the mocks are in the scope!
+        CloseFinally guard(&zh);
+        
+        // detects when all watchers have been delivered
+        WatcherDeliveryTracker deliveryTracker(CHILD_EVENT,0);
+        ChildEventCountingWatcher defWatcher;
+        zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
+                &defWatcher,0);
+        CPPUNIT_ASSERT(zh!=0);
+        // make sure the client has connected
+        CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
+        
+        // a successful server response will activate the watcher 
+        zkServer.addOperationResponse(new ZooStatResponse);
+        ChildEventCountingWatcher wobject1;
+        Stat stat;
+        // add a node watch
+        int rc=zoo_wexists(zh,"/a",activeWatcher,&wobject1,&stat);
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+        
+        typedef ZooGetChildrenResponse::StringVector ZooVector;
+        zkServer.addOperationResponse(new ZooGetChildrenResponse(
+                Util::CollectionBuilder<ZooVector>()("/a/1")("/a/2")
+                ));
+        ChildEventCountingWatcher wobject2;
+        String_vector children;
+        rc=zoo_wget_children(zh,"/a",activeWatcher,&wobject2,&children);
+        deallocate_String_vector(&children);
+        CPPUNIT_ASSERT_EQUAL(ZOK,rc);
+
+        // we are all set now; let's trigger the watches
+        zkServer.addRecvResponse(new ZNodeEvent(CHILD_EVENT,"/a"));
+        // make sure the watchers have been processed
+        CPPUNIT_ASSERT(ensureCondition(
+                deliveryTracker.isWatcherProcessingCompleted(),1000)<1000);
+
+        CPPUNIT_ASSERT_EQUAL(0,wobject1.counter_);
+        CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_);        
+        CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_);
+    }
+
+#endif //THREADED
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_watchers);

Propchange: hadoop/zookeeper/trunk/src/c/tests/TestWatchers.cc
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/zookeeper/trunk/src/c/tests/TestZookeeperClose.cc
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/TestZookeeperClose.cc?rev=679557&r1=679556&r2=679557&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/c/tests/TestZookeeperClose.cc (original)
+++ hadoop/zookeeper/trunk/src/c/tests/TestZookeeperClose.cc Thu Jul 24 14:46:30 2008
@@ -38,7 +38,7 @@
     CPPUNIT_TEST(testCloseFromWatcher1);
     CPPUNIT_TEST_SUITE_END();
     zhandle_t *zh;
-    static void watcher(zhandle_t *, int, int, const char *){}
+    static void watcher(zhandle_t *, int, int, const char *,void*){}
 public: 
     void setUp()
     {
@@ -59,7 +59,7 @@
         virtual void onSessionExpired(zhandle_t* zh){
             memcpy(&lzh,zh,sizeof(lzh));
             if(callClose_)
-                rc=zookeeper_close(zh);           
+                rc=zookeeper_close(zh);
         }
         zhandle_t lzh;
         bool callClose_;
@@ -88,7 +88,7 @@
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname));
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs));
-        CPPUNIT_ASSERT_EQUAL(3,freeMock.callCounter);
+        CPPUNIT_ASSERT_EQUAL(9,freeMock.callCounter);
     }
     void testCloseUnconnected1()
     {
@@ -236,7 +236,7 @@
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname));
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs));
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(adaptor));
-        CPPUNIT_ASSERT_EQUAL(4,freeMock.callCounter);
+        CPPUNIT_ASSERT_EQUAL(10,freeMock.callCounter);
         // threads
         CPPUNIT_ASSERT_EQUAL(1,MockPthreadsNull::getDestroyCounter(adaptor->io));
         CPPUNIT_ASSERT_EQUAL(1,MockPthreadsNull::getDestroyCounter(adaptor->completion));
@@ -410,6 +410,8 @@
             CPPUNIT_ASSERT(zh!=0);
             CPPUNIT_ASSERT(ensureCondition(SessionExpired(zh),1000)<1000);
             CPPUNIT_ASSERT(ensureCondition(IOThreadStopped(zh),1000)<1000);
+            // make sure the watcher has been processed
+            CPPUNIT_ASSERT(ensureCondition(closeAction.isWatcherTriggered(),1000)<1000);
             // make sure the threads have not been destroyed yet
             adaptor_threads* adaptor=(adaptor_threads*)zh->adaptor_priv;
             CPPUNIT_ASSERT_EQUAL(0,CheckedPthread::getDestroyCounter(adaptor->io));

Modified: hadoop/zookeeper/trunk/src/c/tests/TestZookeeperInit.cc
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/TestZookeeperInit.cc?rev=679557&r1=679556&r2=679557&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/c/tests/TestZookeeperInit.cc (original)
+++ hadoop/zookeeper/trunk/src/c/tests/TestZookeeperInit.cc Thu Jul 24 14:46:30 2008
@@ -49,7 +49,7 @@
 	CPPUNIT_TEST_SUITE_END();
     zhandle_t *zh;
     MockPthreadsNull* pthreadMock;   
-    static void watcher(zhandle_t *, int , int , const char *){}
+    static void watcher(zhandle_t *, int , int , const char *,void*){}
 public: 
     Zookeeper_init():zh(0),pthreadMock(0){}
     

Added: hadoop/zookeeper/trunk/src/c/tests/Vector.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/Vector.h?rev=679557&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/c/tests/Vector.h (added)
+++ hadoop/zookeeper/trunk/src/c/tests/Vector.h Thu Jul 24 14:46:30 2008
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _VECTOR_UTIL_H
+#define _VECTOR_UTIL_H
+
+#include <vector>
+
+// function to conveniently stream vectors
+template <class U>
+std::ostream& operator<<(std::ostream& os,const std::vector<U>& c){
+  typedef std::vector<U> V;
+  os<<"[";
+  if(c.size()>0){
+      for(typename V::const_iterator it=c.begin();it!=c.end();++it)
+          os<<*it<<",";
+      os.seekp(-1,std::ios::cur);
+  }
+  os<<"]";
+  return os;
+}
+
+#endif // _VECTOR_UTIL_H

Propchange: hadoop/zookeeper/trunk/src/c/tests/Vector.h
------------------------------------------------------------------------------
    svn:eol-style = native