You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2013/03/07 07:01:50 UTC

svn commit: r1453693 [2/5] - in /zookeeper/trunk: ./ src/ src/c/ src/c/include/ src/c/src/ src/c/tests/ src/java/main/org/apache/zookeeper/ src/java/main/org/apache/zookeeper/cli/ src/java/main/org/apache/zookeeper/common/ src/java/main/org/apache/zook...

Modified: zookeeper/trunk/src/c/src/cli.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/cli.c?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/cli.c (original)
+++ zookeeper/trunk/src/c/src/cli.c Thu Mar  7 06:01:49 2013
@@ -57,7 +57,7 @@ static int recvd=0;
 
 static int shutdownThisThing=0;
 
-static __attribute__ ((unused)) void 
+static __attribute__ ((unused)) void
 printProfileInfo(struct timeval start, struct timeval end, int thres,
                  const char* msg)
 {
@@ -159,10 +159,10 @@ void dumpStat(const struct Stat *stat) {
     }
     tctime = stat->ctime/1000;
     tmtime = stat->mtime/1000;
-       
+
     ctime_r(&tmtime, tmtimes);
     ctime_r(&tctime, tctimes);
-       
+
     fprintf(stderr, "\tctime = %s\tczxid=%llx\n"
     "\tmtime=%s\tmzxid=%llx\n"
     "\tversion=%x\taversion=%x\n"
@@ -326,6 +326,9 @@ void processline(char *line) {
       fprintf(stderr, "    myid\n");
       fprintf(stderr, "    verbose\n");
       fprintf(stderr, "    addauth <id> <scheme>\n");
+      fprintf(stderr, "    config\n");
+      fprintf(stderr, "    reconfig [-file <path> | -members <serverId=host:port1:port2;port3>,... | "
+                          " -add <serverId=host:port1:port2;port3>,... | -remove <serverId>,...] [-version <version>]\n");
       fprintf(stderr, "    quit\n");
       fprintf(stderr, "\n");
       fprintf(stderr, "    prefix the command with the character 'a' to run the command asynchronously.\n");
@@ -347,12 +350,122 @@ void processline(char *line) {
             fprintf(stderr, "Path must start with /, found: %s\n", line);
             return;
         }
-               
+
         rc = zoo_aget(zh, line, 1, my_data_completion, strdup(line));
         if (rc) {
             fprintf(stderr, "Error %d for %s\n", rc, line);
         }
-    } else if (startsWith(line, "set ")) {
+    } else if (strcmp(line, "config") == 0) {
+       gettimeofday(&startTime, 0);
+        rc = zoo_agetconfig(zh, 1, my_data_completion, strdup(ZOO_CONFIG_NODE));
+        if (rc) {
+            fprintf(stderr, "Error %d for %s\n", rc, line);
+        }
+   } else if (startsWith(line, "reconfig ")) {
+           line += 9;
+           int syntaxError = 0;
+
+           char* joining = NULL;
+           char* leaving = NULL;
+           char* members = NULL;
+           size_t members_size = 0;
+
+           int mode = 0; // 0 = not set, 1 = incremental, 2 = non-incremental
+           int64_t version = -1;
+
+           char *p = strtok (strdup(line)," ");
+
+           while (p != NULL) {
+               if (strcmp(p, "-add")==0) {
+                   p = strtok (NULL," ");
+                   if (mode == 2 || p == NULL) {
+                       syntaxError = 1;
+                       break;
+                   }
+                   mode = 1;
+                   joining = strdup(p);
+               } else if (strcmp(p, "-remove")==0){
+                   p = strtok (NULL," ");
+                   if (mode == 2 || p == NULL) {
+                       syntaxError = 1;
+                       break;
+                   }
+                   mode = 1;
+                   leaving = strdup(p);
+               } else if (strcmp(p, "-members")==0) {
+                   p = strtok (NULL," ");
+                   if (mode == 1 || p == NULL) {
+                       syntaxError = 1;
+                       break;
+                   }
+                   mode = 2;
+                   members = strdup(p);
+               } else if (strcmp(p, "-file")==0){
+                   p = strtok (NULL," ");
+                   if (mode == 1 || p == NULL) {
+                       syntaxError = 1;
+                       break;
+                   }
+                   mode = 2;
+                   FILE *fp = fopen(p, "r");
+                   if (fp == NULL) {
+                       fprintf(stderr, "Error reading file: %s\n", p);
+                       syntaxError = 1;
+                       break;
+                   }
+                   fseek(fp, 0L, SEEK_END);  /* Position to end of file */
+                   members_size = ftell(fp);     /* Get file length */
+                   rewind(fp);               /* Back to start of file */
+                   members = calloc(members_size + 1, sizeof(char));
+                   if(members == NULL )
+                   {
+                       fprintf(stderr, "\nInsufficient memory to read file: %s\n", p);
+                       syntaxError = 1;
+                       fclose(fp);
+                       break;
+                   }
+
+                   /* Read the entire file into members
+                    * NOTE: -- fread returns number of items successfully read
+                    * not the number of bytes. We're requesting one item of
+                    * members_size bytes. So we expect the return value here
+                    * to be 1.
+                    */
+                   if (fread(members, members_size, 1, fp) != 1){
+                       fprintf(stderr, "Error reading file: %s\n", p);
+                       syntaxError = 1;
+                       fclose(fp);
+                        break;
+                   }
+                   fclose(fp);
+               } else if (strcmp(p, "-version")==0){
+                   p = strtok (NULL," ");
+                   if (version != -1 || p == NULL){
+                       syntaxError = 1;
+                       break;
+                   }
+                   version = strtoull(p, NULL, 16);
+                   if (version < 0) {
+                       syntaxError = 1;
+                       break;
+                   }
+               } else {
+                   syntaxError = 1;
+                   break;
+               }
+               p = strtok (NULL," ");
+           }
+           if (syntaxError) return;
+
+           rc = zoo_areconfig(zh, joining, leaving, members, version, my_data_completion, strdup(line));
+           free(joining);
+           free(leaving);
+           free(members);
+           if (rc) {
+               fprintf(stderr, "Error %d for %s\n", rc, line);
+           }
+
+   } else if (startsWith(line, "set ")) {
         char *ptr;
         line += 4;
         if (line[0] != '/') {
@@ -497,7 +610,7 @@ void processline(char *line) {
         printf("session Id = %llx\n", _LL_CAST_ zoo_client_id(zh)->client_id);
     } else if (strcmp(line, "reinit") == 0) {
         zookeeper_close(zh);
-        // we can't send myid to the server here -- zookeeper_close() removes 
+        // we can't send myid to the server here -- zookeeper_close() removes
         // the session on the server. We must start anew.
         zh = zookeeper_init(hostPort, watcher, 30000, 0, 0, 0);
     } else if (startsWith(line, "quit")) {
@@ -528,7 +641,7 @@ int main(int argc, char **argv) {
 #endif
     char buffer[4096];
     char p[2048];
-#ifdef YCA  
+#ifdef YCA
     char *cert=0;
     char appId[64];
 #endif
@@ -656,7 +769,7 @@ int main(int argc, char **argv) {
           processline(cmd);
           processed=1;
         }
-        if (FD_ISSET(0, &rfds)) {
+        if (!processed && FD_ISSET(0, &rfds)) {
             int rc;
             int len = sizeof(buffer) - bufoff -1;
             if (len <= 0) {

Modified: zookeeper/trunk/src/c/src/zookeeper.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/zookeeper.c?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/zookeeper.c (original)
+++ zookeeper/trunk/src/c/src/zookeeper.c Thu Mar  7 06:01:49 2013
@@ -189,10 +189,10 @@ static int deserialize_multi(int xid, co
 
 /* completion routine forward declarations */
 static int add_completion(zhandle_t *zh, int xid, int completion_type,
-        const void *dc, const void *data, int add_to_front, 
+        const void *dc, const void *data, int add_to_front,
         watcher_registration_t* wo, completion_head_t *clist);
 static completion_list_t* create_completion_entry(int xid, int completion_type,
-        const void *dc, const void *data, watcher_registration_t* wo, 
+        const void *dc, const void *data, watcher_registration_t* wo,
         completion_head_t *clist);
 static void destroy_completion_entry(completion_list_t* c);
 static void queue_completion_nolock(completion_head_t *list, completion_list_t *c,
@@ -438,12 +438,12 @@ static void setup_random()
 
 #ifndef __CYGWIN__
 /**
- * get the errno from the return code 
+ * get the errno from the return code
  * of get addrinfo. Errno is not set
  * with the call to getaddrinfo, so thats
  * why we have to do this.
  */
-static int getaddrinfo_errno(int rc) { 
+static int getaddrinfo_errno(int rc) {
     switch(rc) {
     case EAI_NONAME:
 // ZOOKEEPER-1323 EAI_NODATA and EAI_ADDRFAMILY are deprecated in FreeBSD.
@@ -616,10 +616,10 @@ int resolve_hosts(const char *hosts_in, 
 
         if ((rc = getaddrinfo(host, port_spec, &hints, &res0)) != 0) {
             //bug in getaddrinfo implementation when it returns
-            //EAI_BADFLAGS or EAI_ADDRFAMILY with AF_UNSPEC and 
+            //EAI_BADFLAGS or EAI_ADDRFAMILY with AF_UNSPEC and
             // ai_flags as AI_ADDRCONFIG
 #ifdef AI_ADDRCONFIG
-            if ((hints.ai_flags == AI_ADDRCONFIG) && 
+            if ((hints.ai_flags == AI_ADDRCONFIG) &&
 // ZOOKEEPER-1323 EAI_NODATA and EAI_ADDRFAMILY are deprecated in FreeBSD.
 #ifdef EAI_ADDRFAMILY
                 ((rc ==EAI_BADFLAGS) || (rc == EAI_ADDRFAMILY))) {
@@ -707,19 +707,19 @@ fail:
  * a) the server this client is currently connected is not in new address list.
  *    Otherwise (if currentHost is in the new list):
  * b) the number of servers in the cluster is increasing - in this case the load
- *    on currentHost should decrease, which means that SOME of the clients 
+ *    on currentHost should decrease, which means that SOME of the clients
  *    connected to it will migrate to the new servers. The decision whether this
- *    client migrates or not is probabilistic so that the expected number of 
+ *    client migrates or not is probabilistic so that the expected number of
  *    clients connected to each server is the same.
- *    
- * If reconfig is set to true, the function sets pOld and pNew that correspond 
+ *
+ * If reconfig is set to true, the function sets pOld and pNew that correspond
  * to the probability to migrate to ones of the new servers or one of the old
  * servers (migrating to one of the old servers is done only if our client's
- * currentHost is not in new list). 
- * 
+ * currentHost is not in new list).
+ *
  * See zoo_cycle_next_server for the selection logic.
- * 
- * See {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1355} for the 
+ *
+ * See {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1355} for the
  * protocol and its evaluation,
  */
 int update_addrs(zhandle_t *zh)
@@ -741,7 +741,7 @@ int update_addrs(zhandle_t *zh)
     if (zh->hostname == NULL)
     {
         return ZSYSTEMERROR;
-    } 
+    }
 
     // NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new}
     lock_reconfig(zh);
@@ -784,7 +784,7 @@ int update_addrs(zhandle_t *zh)
             {
                 goto fail;
             }
-        } 
+        }
         else {
             rc = addrvec_append(&zh->addrs_new, resolved_address);
             if (rc != ZOK)
@@ -811,12 +811,12 @@ int update_addrs(zhandle_t *zh)
                 zh->reconfig = 0;
             }
         } else {
-            // my server is not in the new config, and load on old servers must 
+            // my server is not in the new config, and load on old servers must
             // be decreased, so connect to one of the new servers
             zh->pNew = 1;
             zh->pOld = 0;
         }
-    } 
+    }
 
     // Number of servers stayed the same or decreased
     else {
@@ -833,8 +833,8 @@ int update_addrs(zhandle_t *zh)
     addrvec_free(&zh->addrs);
     zh->addrs = resolved;
 
-    // If we need to do a reconfig and we're currently connected to a server, 
-    // then force close that connection so on next interest() call we'll make a 
+    // If we need to do a reconfig and we're currently connected to a server,
+    // then force close that connection so on next interest() call we'll make a
     // new connection
     if (zh->reconfig == 1 && zh->fd != -1)
     {
@@ -847,7 +847,7 @@ fail:
 
     unlock_reconfig(zh);
 
-    // If we short-circuited out and never assigned resolved to zh->addrs then we 
+    // If we short-circuited out and never assigned resolved to zh->addrs then we
     // need to free resolved to avoid a memleak.
     if (zh->addrs.data != resolved.data)
     {
@@ -1092,15 +1092,15 @@ int zoo_set_servers(zhandle_t *zh, const
  * we've updated the server list to connect to, and are now trying to find some
  * server to connect to. Once we get successfully connected, 'reconfig' mode is
  * set to false. Similarly, if we tried to connect to all servers in new config
- * and failed, 'reconfig' mode is set to false. 
+ * and failed, 'reconfig' mode is set to false.
  *
  * While in 'reconfig' mode, we should connect to a server in the new set of
- * servers (addrs_new) with probability pNew and to servers in the old set of 
+ * servers (addrs_new) with probability pNew and to servers in the old set of
  * servers (addrs_old) with probability pOld (which is just 1-pNew). If we tried
- * out all servers in either, we continue to try servers from the other set, 
+ * out all servers in either, we continue to try servers from the other set,
  * regardless of pNew or pOld. If we tried all servers we give up and go back to
  * the normal round robin mode
- * 
+ *
  * When called, must be protected by lock_reconfig(zh).
  */
 static int get_next_server_in_reconfig(zhandle_t *zh)
@@ -1108,16 +1108,16 @@ static int get_next_server_in_reconfig(z
     int take_new = drand48() <= zh->pNew;
 
     LOG_DEBUG(("[OLD] count=%d capacity=%d next=%d hasnext=%d",
-               zh->addrs_old.count, zh->addrs_old.capacity, zh->addrs_old.next, 
+               zh->addrs_old.count, zh->addrs_old.capacity, zh->addrs_old.next,
                addrvec_hasnext(&zh->addrs_old)));
     LOG_DEBUG(("[NEW] count=%d capacity=%d next=%d hasnext=%d",
-               zh->addrs_new.count, zh->addrs_new.capacity, zh->addrs_new.next, 
+               zh->addrs_new.count, zh->addrs_new.capacity, zh->addrs_new.next,
                addrvec_hasnext(&zh->addrs_new)));
 
     // Take one of the new servers if we haven't tried them all yet
     // and either the probability tells us to connect to one of the new servers
     // or if we already tried them all then use one of the old servers
-    if (addrvec_hasnext(&zh->addrs_new) 
+    if (addrvec_hasnext(&zh->addrs_new)
             && (take_new || !addrvec_hasnext(&zh->addrs_old)))
     {
         addrvec_next(&zh->addrs_new, &zh->addr_cur);
@@ -1137,14 +1137,14 @@ static int get_next_server_in_reconfig(z
     return 1;
 }
 
-/** 
+/**
  * Cycle through our server list to the correct 'next' server. The 'next' server
  * to connect to depends upon whether we're in a 'reconfig' mode or not. Reconfig
  * mode means we've upated the server list and are now trying to find a server
  * to connect to. Once we get connected, we are no longer in the reconfig mode.
  * Similarly, if we try to connect to all the servers in the new configuration
  * and failed, reconfig mode is set to false.
- * 
+ *
  * For more algorithm details, see get_next_server_in_reconfig.
  */
 void zoo_cycle_next_server(zhandle_t *zh)
@@ -1172,7 +1172,7 @@ void zoo_cycle_next_server(zhandle_t *zh
     return;
 }
 
-/** 
+/**
  * Get the host:port for the server we are currently connecting to or connected
  * to. This is largely for testing purposes but is also generally useful for
  * other client software built on top of this client.
@@ -1364,9 +1364,9 @@ static int send_buffer(int fd, buffer_li
         if (rc == -1) {
 #ifndef _WINDOWS
             if (errno != EAGAIN) {
-#else            
+#else
             if (WSAGetLastError() != WSAEWOULDBLOCK) {
-#endif            
+#endif
                 return -1;
             } else {
                 return 0;
@@ -1383,9 +1383,9 @@ static int send_buffer(int fd, buffer_li
         if (rc == -1) {
 #ifndef _WINDOWS
             if (errno != EAGAIN) {
-#else            
+#else
             if (WSAGetLastError() != WSAEWOULDBLOCK) {
-#endif            
+#endif
                 return -1;
             }
         } else {
@@ -1670,7 +1670,7 @@ static int send_auth_info(zhandle_t *zh)
 }
 
 static int send_last_auth_info(zhandle_t *zh)
-{    
+{
     int rc = 0;
     auth_info *auth = NULL;
 
@@ -1724,7 +1724,7 @@ static int send_set_watches(zhandle_t *z
     /* add this buffer to the head of the send queue */
     rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa),
             get_buffer_len(oa));
-    /* We queued the buffer, so don't free it */   
+    /* We queued the buffer, so don't free it */
     close_buffer_oarchive(&oa, 0);
     free_key_list(req.dataWatches.data, req.dataWatches.count);
     free_key_list(req.existWatches.data, req.existWatches.count);
@@ -1888,7 +1888,7 @@ int zookeeper_interest(zhandle_t *zh, in
     gettimeofday(&now, 0);
     if(zh->next_deadline.tv_sec!=0 || zh->next_deadline.tv_usec!=0){
         int time_left = calculate_interval(&zh->next_deadline, &now);
-        int max_exceed = zh->recv_timeout / 10 > 200 ? 200 : 
+        int max_exceed = zh->recv_timeout / 10 > 200 ? 200 :
                          (zh->recv_timeout / 10);
         if (time_left > max_exceed)
             LOG_WARN(("Exceeded deadline by %dms", time_left));
@@ -1907,7 +1907,7 @@ int zookeeper_interest(zhandle_t *zh, in
 
     if (*fd == -1) {
 
-        /* 
+        /*
          * If we previously failed to connect to server pool (zh->delay == 1)
          * then we need delay our connection on this iteration 1/60 of the
          * recv timeout before trying again so we don't spin.
@@ -1921,7 +1921,7 @@ int zookeeper_interest(zhandle_t *zh, in
 
             LOG_WARN(("Delaying connection after exhaustively trying all servers [%s]",
                      zh->hostname));
-        } 
+        }
 
         // No need to delay -- grab the next server and attempt connection
         else {
@@ -1944,7 +1944,7 @@ int zookeeper_interest(zhandle_t *zh, in
                 LOG_WARN(("Unable to set TCP_NODELAY, operation latency may be effected"));
             }
 #ifdef WIN32
-            ioctlsocket(zh->fd, FIONBIO, &nonblocking_flag);                    
+            ioctlsocket(zh->fd, FIONBIO, &nonblocking_flag);
 #else
             fcntl(zh->fd, F_SETFL, O_NONBLOCK|fcntl(zh->fd, F_GETFL, 0));
 #endif
@@ -2233,7 +2233,7 @@ static void process_sync_completion(
             cptr->c.type, cptr->xid, sc->rc));
 
     switch(cptr->c.type) {
-    case COMPLETION_DATA: 
+    case COMPLETION_DATA:
         if (sc->rc==0) {
             struct GetDataResponse res;
             int len;
@@ -2289,7 +2289,7 @@ static void process_sync_completion(
             const char * client_path;
             deserialize_CreateResponse(ia, "reply", &res);
             //ZOOKEEPER-1027
-            client_path = sub_string(zh, res.path); 
+            client_path = sub_string(zh, res.path);
             len = strlen(client_path) + 1;if (len > sc->u.str.str_len) {
                 len = sc->u.str.str_len;
             }
@@ -2672,7 +2672,7 @@ int zookeeper_process(zhandle_t *zh, int
                         *sc = (struct sync_completion*)cptr->data;
                 sc->rc = rc;
 
-                process_sync_completion(cptr, sc, ia, zh); 
+                process_sync_completion(cptr, sc, ia, zh);
 
                 notify_sync_completion(sc);
                 free_buffer(bptr);
@@ -2771,9 +2771,9 @@ static void destroy_completion_entry(com
     }
 }
 
-static void queue_completion_nolock(completion_head_t *list, 
+static void queue_completion_nolock(completion_head_t *list,
                                     completion_list_t *c,
-                                    int add_to_front) 
+                                    int add_to_front)
 {
     c->next = 0;
     /* appending a new entry to the back of the list */
@@ -2990,7 +2990,7 @@ static int isValidPath(const char* path,
  * REQUEST INIT HELPERS
  *---------------------------------------------------------------------------*/
 /* Common Request init helper functions to reduce code duplication */
-static int Request_path_init(zhandle_t *zh, int flags, 
+static int Request_path_init(zhandle_t *zh, int flags,
         char **path_out, const char *path)
 {
     assert(path_out);
@@ -3052,7 +3052,7 @@ int zoo_awget(zhandle_t *zh, const char 
     rc = rc < 0 ? rc : serialize_GetDataRequest(oa, "req", &req);
     enter_critical(zh);
     rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data,
-        create_watcher_registration(server_path,data_result_checker,watcher,watcherCtx));
+    create_watcher_registration(server_path,data_result_checker,watcher,watcherCtx));
     rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
             get_buffer_len(oa));
     leave_critical(zh);
@@ -3067,6 +3067,87 @@ int zoo_awget(zhandle_t *zh, const char 
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
 }
 
+int zoo_agetconfig(zhandle_t *zh, int watch, data_completion_t dc,
+        const void *data)
+{
+    return zoo_awgetconfig(zh,watch?zh->watcher:0,zh->context,dc,data);
+}
+
+int zoo_awgetconfig(zhandle_t *zh, watcher_fn watcher, void* watcherCtx,
+        data_completion_t dc, const void *data)
+{
+    struct oarchive *oa;
+    char *path = ZOO_CONFIG_NODE;
+    char *server_path = ZOO_CONFIG_NODE;
+    struct RequestHeader h = { get_xid(), ZOO_GETDATA_OP };
+    struct GetDataRequest req =  { (char*)server_path, watcher!=0 };
+    int rc;
+
+    if (zh==0 || !isValidPath(server_path, 0)) {
+        free_duplicate_path(server_path, path);
+        return ZBADARGUMENTS;
+    }
+    if (is_unrecoverable(zh)) {
+        free_duplicate_path(server_path, path);
+        return ZINVALIDSTATE;
+    }
+    oa=create_buffer_oarchive();
+    rc = serialize_RequestHeader(oa, "header", &h);
+    rc = rc < 0 ? rc : serialize_GetDataRequest(oa, "req", &req);
+    enter_critical(zh);
+    rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data,
+                                           create_watcher_registration(server_path,data_result_checker,watcher,watcherCtx));
+    rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
+                                          get_buffer_len(oa));
+    leave_critical(zh);
+    free_duplicate_path(server_path, path);
+    /* We queued the buffer, so don't free it */
+    close_buffer_oarchive(&oa, 0);
+
+    LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
+               zoo_get_current_server(zh)));
+    /* make a best (non-blocking) effort to send the requests asap */
+    adaptor_send_queue(zh, 0);
+    return (rc < 0)?ZMARSHALLINGERROR:ZOK;
+}
+
+int zoo_areconfig(zhandle_t *zh, const char *joining, const char *leaving,
+       const char *members, int64_t version, data_completion_t dc, const void *data)
+{
+    struct oarchive *oa;
+    struct RequestHeader h = { get_xid(), ZOO_RECONFIG_OP };
+    struct ReconfigRequest req;
+   int rc = 0;
+
+    if (zh==0) {
+        return ZBADARGUMENTS;
+    }
+    if (is_unrecoverable(zh)) {
+        return ZINVALIDSTATE;
+    }
+
+   oa=create_buffer_oarchive();
+   req.joiningServers = (char *)joining;
+   req.leavingServers = (char *)leaving;
+   req.newMembers = (char *)members;
+   req.curConfigId = version;
+    rc = serialize_RequestHeader(oa, "header", &h);
+   rc = rc < 0 ? rc : serialize_ReconfigRequest(oa, "req", &req);
+    enter_critical(zh);
+    rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data, NULL);
+    rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
+            get_buffer_len(oa));
+    leave_critical(zh);
+    /* We queued the buffer, so don't free it */
+    close_buffer_oarchive(&oa, 0);
+
+    LOG_DEBUG(("Sending Reconfig request xid=%#x to %s",h.xid, zoo_get_current_server(zh)));
+    /* make a best (non-blocking) effort to send the requests asap */
+    adaptor_send_queue(zh, 0);
+
+    return (rc < 0)?ZMARSHALLINGERROR:ZOK;
+}
+
 static int SetDataRequest_init(zhandle_t *zh, struct SetDataRequest *req,
         const char *path, const char *buffer, int buflen, int version)
 {
@@ -3223,7 +3304,7 @@ int zoo_acreate2(zhandle_t *zh, const ch
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
 }
 
-int DeleteRequest_init(zhandle_t *zh, struct DeleteRequest *req, 
+int DeleteRequest_init(zhandle_t *zh, struct DeleteRequest *req,
         const char *path, int version)
 {
     int rc = Request_path_init(zh, 0, &req->path, path);
@@ -3526,7 +3607,7 @@ static void op_result_stat_completion(in
     } else {
         result->stat = NULL ;
     }
-}   
+}
 
 static int CheckVersionRequest_init(zhandle_t *zh, struct CheckVersionRequest *req,
         const char *path, int version)
@@ -3565,16 +3646,16 @@ int zoo_amulti(zhandle_t *zh, int count,
             case ZOO_CREATE_OP: {
                 struct CreateRequest req;
 
-                rc = rc < 0 ? rc : CreateRequest_init(zh, &req, 
-                                        op->create_op.path, op->create_op.data, 
-                                        op->create_op.datalen, op->create_op.acl, 
+                rc = rc < 0 ? rc : CreateRequest_init(zh, &req,
+                                        op->create_op.path, op->create_op.data,
+                                        op->create_op.datalen, op->create_op.acl,
                                         op->create_op.flags);
                 rc = rc < 0 ? rc : serialize_CreateRequest(oa, "req", &req);
                 result->value = op->create_op.buf;
                 result->valuelen = op->create_op.buflen;
 
                 enter_critical(zh);
-                entry = create_completion_entry(h.xid, COMPLETION_STRING, op_result_string_completion, result, 0, 0); 
+                entry = create_completion_entry(h.xid, COMPLETION_STRING, op_result_string_completion, result, 0, 0);
                 leave_critical(zh);
                 free_duplicate_path(req.path, op->create_op.path);
                 break;
@@ -3586,7 +3667,7 @@ int zoo_amulti(zhandle_t *zh, int count,
                 rc = rc < 0 ? rc : serialize_DeleteRequest(oa, "req", &req);
 
                 enter_critical(zh);
-                entry = create_completion_entry(h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0); 
+                entry = create_completion_entry(h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0);
                 leave_critical(zh);
                 free_duplicate_path(req.path, op->delete_op.path);
                 break;
@@ -3595,13 +3676,13 @@ int zoo_amulti(zhandle_t *zh, int count,
             case ZOO_SETDATA_OP: {
                 struct SetDataRequest req;
                 rc = rc < 0 ? rc : SetDataRequest_init(zh, &req,
-                                        op->set_op.path, op->set_op.data, 
+                                        op->set_op.path, op->set_op.data,
                                         op->set_op.datalen, op->set_op.version);
                 rc = rc < 0 ? rc : serialize_SetDataRequest(oa, "req", &req);
                 result->stat = op->set_op.stat;
 
                 enter_critical(zh);
-                entry = create_completion_entry(h.xid, COMPLETION_STAT, op_result_stat_completion, result, 0, 0); 
+                entry = create_completion_entry(h.xid, COMPLETION_STAT, op_result_stat_completion, result, 0, 0);
                 leave_critical(zh);
                 free_duplicate_path(req.path, op->set_op.path);
                 break;
@@ -3614,15 +3695,15 @@ int zoo_amulti(zhandle_t *zh, int count,
                 rc = rc < 0 ? rc : serialize_CheckVersionRequest(oa, "req", &req);
 
                 enter_critical(zh);
-                entry = create_completion_entry(h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0); 
+                entry = create_completion_entry(h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0);
                 leave_critical(zh);
                 free_duplicate_path(req.path, op->check_op.path);
                 break;
-            } 
+            }
 
             default:
                 LOG_ERROR(("Unimplemented sub-op type=%d in multi-op", op->type));
-                return ZUNIMPLEMENTED; 
+                return ZUNIMPLEMENTED;
         }
 
         queue_completion(&clist, entry, 0);
@@ -3649,7 +3730,7 @@ int zoo_amulti(zhandle_t *zh, int count,
 }
 
 void zoo_create_op_init(zoo_op_t *op, const char *path, const char *value,
-        int valuelen,  const struct ACL_vector *acl, int flags, 
+        int valuelen,  const struct ACL_vector *acl, int flags,
         char *path_buffer, int path_buffer_len)
 {
     assert(op);
@@ -3686,7 +3767,7 @@ void zoo_delete_op_init(zoo_op_t *op, co
     op->delete_op.version = version;
 }
 
-void zoo_set_op_init(zoo_op_t *op, const char *path, const char *buffer, 
+void zoo_set_op_init(zoo_op_t *op, const char *path, const char *buffer,
         int buflen, int version, struct Stat *stat)
 {
     assert(op);
@@ -3732,7 +3813,7 @@ int flush_send_queue(zhandle_t*zh, int t
     int rc= ZOK;
     struct timeval started;
 #ifdef WIN32
-    fd_set pollSet; 
+    fd_set pollSet;
     struct timeval wait;
 #endif
     gettimeofday(&started,0);
@@ -3757,7 +3838,7 @@ int flush_send_queue(zhandle_t*zh, int t
             FD_ZERO(&pollSet);
             FD_SET(zh->fd, &pollSet);
             // Poll the socket
-            rc = select((int)(zh->fd)+1, NULL,  &pollSet, NULL, &wait);      
+            rc = select((int)(zh->fd)+1, NULL,  &pollSet, NULL, &wait);
 #else
             struct pollfd fds;
             fds.fd = zh->fd;
@@ -3843,6 +3924,10 @@ const char* zerror(int c)
       return "(not error) no server responses to process";
     case ZSESSIONMOVED:
       return "session moved to another server, so operation is ignored";
+   case ZNEWCONFIGNOQUORUM:
+       return "no quorum of new config is connected and up-to-date with the leader of last commmitted config - try invoking reconfiguration after new servers are connected and synced";
+   case ZRECONFIGINPROGRESS:
+     return "Another reconfiguration is in progress -- concurrent reconfigs not supported (yet)";
     }
     if (c > 0) {
       return strerror(c);
@@ -3862,7 +3947,7 @@ int zoo_add_auth(zhandle_t *zh,const cha
         return ZINVALIDSTATE;
 
     // [ZOOKEEPER-800] zoo_add_auth should return ZINVALIDSTATE if
-    // the connection is closed. 
+    // the connection is closed.
     if (zoo_state(zh) == 0) {
         return ZINVALIDSTATE;
     }
@@ -3919,12 +4004,12 @@ static const char* format_endpoint_info(
     }
 #endif
 #ifdef WIN32
-    addrstring = inet_ntoa (*(struct in_addr*)inaddr); 
+    addrstring = inet_ntoa (*(struct in_addr*)inaddr);
     sprintf(buf,"%s:%d",addrstring,ntohs(port));
 #else
     inet_ntop(ep->ss_family,inaddr,addrstr,sizeof(addrstr)-1);
     sprintf(buf,"%s:%d",addrstr,ntohs(port));
-#endif    
+#endif
     return buf;
 }
 
@@ -4056,6 +4141,48 @@ int zoo_wget(zhandle_t *zh, const char *
     return rc;
 }
 
+int zoo_getconfig(zhandle_t *zh, int watch, char *buffer,
+        int* buffer_len, struct Stat *stat)
+{
+    return zoo_wget(zh,ZOO_CONFIG_NODE,watch?zh->watcher:0,zh->context, buffer,buffer_len,stat);
+}
+
+int zoo_wgetconfig(zhandle_t *zh, watcher_fn watcher, void* watcherCtx,
+       char *buffer, int* buffer_len, struct Stat *stat)
+{
+   return zoo_wget(zh, ZOO_CONFIG_NODE, watcher, watcherCtx, buffer, buffer_len, stat);
+}
+
+
+int zoo_reconfig(zhandle_t *zh, const char *joining, const char *leaving,
+       const char *members, int64_t version, char *buffer, int* buffer_len,
+       struct Stat *stat)
+{
+    struct sync_completion *sc;
+    int rc=0;
+
+    if(buffer_len==NULL)
+        return ZBADARGUMENTS;
+    if((sc=alloc_sync_completion())==NULL)
+        return ZSYSTEMERROR;
+
+    sc->u.data.buffer = buffer;
+    sc->u.data.buff_len = *buffer_len;
+    rc=zoo_areconfig(zh, joining, leaving, members, version, SYNCHRONOUS_MARKER, sc);
+
+    if(rc==ZOK){
+        wait_sync_completion(sc);
+        rc = sc->rc;
+        if (rc == 0) {
+            if(stat)
+                *stat = sc->u.data.stat;
+            *buffer_len = sc->u.data.buff_len;
+        }
+    }
+    free_sync_completion(sc);
+    return rc;
+}
+
 int zoo_set(zhandle_t *zh, const char *path, const char *buffer, int buflen,
         int version)
 {

Added: zookeeper/trunk/src/c/tests/TestReconfigServer.cc
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/TestReconfigServer.cc?rev=1453693&view=auto
==============================================================================
--- zookeeper/trunk/src/c/tests/TestReconfigServer.cc (added)
+++ zookeeper/trunk/src/c/tests/TestReconfigServer.cc Thu Mar  7 06:01:49 2013
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+#include <algorithm>
+#include <cppunit/extensions/HelperMacros.h>
+#include "zookeeper.h"
+
+#include "Util.h"
+#include "ZooKeeperQuorumServer.h"
+
+class TestReconfigServer : public CPPUNIT_NS::TestFixture {
+    CPPUNIT_TEST_SUITE(TestReconfigServer);
+#ifdef THREADED
+    CPPUNIT_TEST(testNonIncremental);
+    CPPUNIT_TEST(testRemoveConnectedFollower);
+    CPPUNIT_TEST(testRemoveFollower);
+#endif
+    CPPUNIT_TEST_SUITE_END();
+
+  public:
+    TestReconfigServer();
+    virtual ~TestReconfigServer();
+    void setUp();
+    void tearDown();
+    void testNonIncremental();
+    void testRemoveConnectedFollower();
+    void testRemoveFollower();
+
+  private:
+    static const uint32_t NUM_SERVERS;
+    FILE* logfile_;
+    std::vector<ZooKeeperQuorumServer*> cluster_;
+    int32_t getLeader();
+    std::vector<int32_t> getFollowers();
+    void parseConfig(char* buf, int len, std::vector<std::string>& servers,
+                     std::string& version);
+};
+
+const uint32_t TestReconfigServer::NUM_SERVERS = 3;
+
+TestReconfigServer::
+TestReconfigServer() :
+    logfile_(openlogfile("TestReconfigServer")) {
+    zoo_set_log_stream(logfile_);
+}
+
+TestReconfigServer::
+~TestReconfigServer() {
+    if (logfile_) {
+        fflush(logfile_);
+        fclose(logfile_);
+        logfile_ = NULL;
+    }
+}
+
+void TestReconfigServer::
+setUp() {
+    cluster_ = ZooKeeperQuorumServer::getCluster(NUM_SERVERS);
+    // give the cluster some time to start up.
+    sleep(2);
+}
+
+void TestReconfigServer::
+tearDown() {
+    for (int i = 0; i < cluster_.size(); i++) {
+        delete cluster_[i];
+    }
+    cluster_.clear();
+}
+
+int32_t TestReconfigServer::
+getLeader() {
+    for (int32_t i = 0; i < cluster_.size(); i++) {
+        if (cluster_[i]->isLeader()) {
+            return i;
+        }
+    }
+    return -1;
+}
+
+std::vector<int32_t> TestReconfigServer::
+getFollowers() {
+    std::vector<int32_t> followers;
+    for (int32_t i = 0; i < cluster_.size(); i++) {
+        if (cluster_[i]->isFollower()) {
+            followers.push_back(i);
+        }
+    }
+    return followers;
+}
+
+void TestReconfigServer::
+parseConfig(char* buf, int len, std::vector<std::string>& servers,
+            std::string& version) {
+    std::string config(buf, len);
+    std::stringstream ss(config);
+    std::string line;
+    std::string serverPrefix("server.");
+    std::string versionPrefix("version=");
+    servers.clear();
+    while(std::getline(ss, line, '\n')) {
+        if (line.compare(0, serverPrefix.size(), serverPrefix) == 0) {
+            servers.push_back(line);
+        } else if (line.compare(0, versionPrefix.size(), versionPrefix) == 0) {
+            version = line.substr(versionPrefix.size());
+        }
+    }
+}
+
+/**
+ * 1. Connect to the leader.
+ * 2. Remove a follower using incremental reconfig.
+ * 3. Add the follower back using incremental reconfig.
+ */
+void TestReconfigServer::
+testRemoveFollower() {
+    std::vector<std::string> servers;
+    std::string version;
+    struct Stat stat;
+    int len = 1024;
+    char buf[len];
+
+    // get config from leader.
+    int32_t leader = getLeader();
+    CPPUNIT_ASSERT(leader >= 0);
+    std::string host = cluster_[leader]->getHostPort();
+    zhandle_t* zk = zookeeper_init(host.c_str(), NULL, 10000, NULL, NULL, 0);
+    CPPUNIT_ASSERT_EQUAL((int)ZOK, zoo_getconfig(zk, 0, buf, &len, &stat));
+
+    // check if all the servers are listed in the config.
+    parseConfig(buf, len, servers, version);
+    CPPUNIT_ASSERT_EQUAL(std::string("0"), version);
+    CPPUNIT_ASSERT_EQUAL(NUM_SERVERS, (uint32_t)(servers.size()));
+    for (int i = 0; i < cluster_.size(); i++) {
+        CPPUNIT_ASSERT(std::find(servers.begin(), servers.end(),
+                       cluster_[i]->getServerString()) != servers.end());
+    }
+
+    // remove a follower.
+    std::vector<int32_t> followers = getFollowers();
+    len = 1024;
+    CPPUNIT_ASSERT_EQUAL(NUM_SERVERS - 1,
+                         (uint32_t)(followers.size()));
+    std::stringstream ss;
+    ss << followers[0];
+    int rc = zoo_reconfig(zk, NULL, ss.str().c_str(), NULL, -1, buf, &len,
+                          &stat);
+    CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
+    parseConfig(buf, len, servers, version);
+    CPPUNIT_ASSERT_EQUAL(std::string("100000002"), version);
+    CPPUNIT_ASSERT_EQUAL(NUM_SERVERS - 1, (uint32_t)(servers.size()));
+    for (int i = 0; i < cluster_.size(); i++) {
+        if (i == followers[0]) {
+            continue;
+        }
+        CPPUNIT_ASSERT(std::find(servers.begin(), servers.end(),
+                       cluster_[i]->getServerString()) != servers.end());
+    }
+
+    // add the follower back.
+    len = 1024;
+    std::string serverString = cluster_[followers[0]]->getServerString();
+    rc = zoo_reconfig(zk, serverString.c_str(), NULL, NULL, -1, buf, &len,
+                          &stat);
+    CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
+    parseConfig(buf, len, servers, version);
+    CPPUNIT_ASSERT_EQUAL(NUM_SERVERS, (uint32_t)(servers.size()));
+    for (int i = 0; i < cluster_.size(); i++) {
+        CPPUNIT_ASSERT(std::find(servers.begin(), servers.end(),
+                       cluster_[i]->getServerString()) != servers.end());
+    }
+    zookeeper_close(zk);
+}
+
+/**
+ * 1. Connect to the leader.
+ * 2. Remove a follower using non-incremental reconfig.
+ * 3. Add the follower back using non-incremental reconfig.
+ */
+void TestReconfigServer::
+testNonIncremental() {
+    std::vector<std::string> servers;
+    std::string version;
+    struct Stat stat;
+    int len = 1024;
+    char buf[len];
+
+    // get config from leader.
+    int32_t leader = getLeader();
+    CPPUNIT_ASSERT(leader >= 0);
+    std::string host = cluster_[leader]->getHostPort();
+    zhandle_t* zk = zookeeper_init(host.c_str(), NULL, 10000, NULL, NULL, 0);
+    CPPUNIT_ASSERT_EQUAL((int)ZOK, zoo_getconfig(zk, 0, buf, &len, &stat));
+
+    // check if all the servers are listed in the config.
+    parseConfig(buf, len, servers, version);
+    CPPUNIT_ASSERT_EQUAL(std::string("0"), version);
+    CPPUNIT_ASSERT_EQUAL(NUM_SERVERS, (uint32_t)(servers.size()));
+    for (int i = 0; i < cluster_.size(); i++) {
+        CPPUNIT_ASSERT(std::find(servers.begin(), servers.end(),
+                       cluster_[i]->getServerString()) != servers.end());
+    }
+
+    // remove a follower.
+    std::vector<int32_t> followers = getFollowers();
+    len = 1024;
+    CPPUNIT_ASSERT_EQUAL(NUM_SERVERS - 1,
+                         (uint32_t)(followers.size()));
+    std::stringstream ss;
+    for (int i = 1; i < followers.size(); i++) {
+      ss << cluster_[followers[i]]->getServerString() << ",";
+    }
+    ss << cluster_[leader]->getServerString();
+
+    int rc = zoo_reconfig(zk, NULL, NULL, ss.str().c_str(), -1, buf, &len,
+                          &stat);
+    CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
+    parseConfig(buf, len, servers, version);
+    CPPUNIT_ASSERT_EQUAL(std::string("100000002"), version);
+    CPPUNIT_ASSERT_EQUAL(NUM_SERVERS - 1, (uint32_t)(servers.size()));
+    for (int i = 0; i < cluster_.size(); i++) {
+        if (i == followers[0]) {
+            continue;
+        }
+        CPPUNIT_ASSERT(std::find(servers.begin(), servers.end(),
+                       cluster_[i]->getServerString()) != servers.end());
+    }
+
+    // add the follower back.
+    len = 1024;
+    ss.str("");
+    for (int i = 0; i < cluster_.size(); i++) {
+      ss << cluster_[i]->getServerString() << ",";
+    }
+    rc = zoo_reconfig(zk, NULL, NULL, ss.str().c_str(), -1, buf, &len,
+                          &stat);
+    CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
+    parseConfig(buf, len, servers, version);
+    CPPUNIT_ASSERT_EQUAL(NUM_SERVERS, (uint32_t)(servers.size()));
+    for (int i = 0; i < cluster_.size(); i++) {
+        CPPUNIT_ASSERT(std::find(servers.begin(), servers.end(),
+                       cluster_[i]->getServerString()) != servers.end());
+    }
+    zookeeper_close(zk);
+}
+
+/**
+ * 1. Connect to a follower.
+ * 2. Remove the follower the client is connected to.
+ */
+void TestReconfigServer::
+testRemoveConnectedFollower() {
+    std::vector<std::string> servers;
+    std::string version;
+    struct Stat stat;
+    int len = 1024;
+    char buf[len];
+
+    // connect to a follower.
+    int32_t leader = getLeader();
+    std::vector<int32_t> followers = getFollowers();
+    CPPUNIT_ASSERT(leader >= 0);
+    CPPUNIT_ASSERT_EQUAL(NUM_SERVERS - 1, (uint32_t)(followers.size()));
+    std::stringstream ss;
+    for (int i = 0; i < followers.size(); i++) {
+      ss << cluster_[followers[i]]->getHostPort() << ",";
+    }
+    ss << cluster_[leader]->getHostPort();
+    std::string hosts = ss.str().c_str();
+    zoo_deterministic_conn_order(true);
+    zhandle_t* zk = zookeeper_init(hosts.c_str(), NULL, 10000, NULL, NULL, 0);
+    std::string connectedHost(zoo_get_current_server(zk));
+    std::string portString = connectedHost.substr(connectedHost.find(":") + 1);
+    uint32_t port;
+    std::istringstream (portString) >> port;
+    CPPUNIT_ASSERT_EQUAL(cluster_[followers[0]]->getClientPort(), port);
+
+    // remove the follower.
+    len = 1024;
+    ss.str("");
+    ss << followers[0];
+    zoo_reconfig(zk, NULL, ss.str().c_str(), NULL, -1, buf, &len, &stat);
+    CPPUNIT_ASSERT_EQUAL((int)ZOK, zoo_getconfig(zk, 0, buf, &len, &stat));
+    parseConfig(buf, len, servers, version);
+    CPPUNIT_ASSERT_EQUAL(NUM_SERVERS - 1, (uint32_t)(servers.size()));
+    for (int i = 0; i < cluster_.size(); i++) {
+        if (i == followers[0]) {
+            continue;
+        }
+        CPPUNIT_ASSERT(std::find(servers.begin(), servers.end(),
+                       cluster_[i]->getServerString()) != servers.end());
+    }
+}
+
+CPPUNIT_TEST_SUITE_REGISTRATION(TestReconfigServer);

Added: zookeeper/trunk/src/c/tests/ZooKeeperQuorumServer.cc
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/ZooKeeperQuorumServer.cc?rev=1453693&view=auto
==============================================================================
--- zookeeper/trunk/src/c/tests/ZooKeeperQuorumServer.cc (added)
+++ zookeeper/trunk/src/c/tests/ZooKeeperQuorumServer.cc Thu Mar  7 06:01:49 2013
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+#include "ZooKeeperQuorumServer.h"
+
+#include <cassert>
+#include <cstdio>
+#include <cstdlib>
+#include <fstream>
+#include <sstream>
+
+ZooKeeperQuorumServer::
+ZooKeeperQuorumServer(uint32_t id, uint32_t numServers) :
+    id_(id),
+    numServers_(numServers) {
+    const char* root = getenv("ZKROOT");
+    if (root == NULL) {
+        assert(!"Environment variable 'ZKROOT' is not set");
+    }
+    root_ = root;
+    createConfigFile();
+    createDataDirectory();
+    start();
+}
+
+ZooKeeperQuorumServer::
+~ZooKeeperQuorumServer() {
+    stop();
+}
+
+std::string ZooKeeperQuorumServer::
+getHostPort() {
+    std::stringstream ss;
+    ss << "localhost:" << getClientPort();
+    return ss.str();
+}
+
+uint32_t ZooKeeperQuorumServer::
+getClientPort() {
+    return CLIENT_PORT_BASE + id_;
+}
+
+void ZooKeeperQuorumServer::
+start() {
+    std::string command = root_ + "/bin/zkServer.sh start " +
+                          getConfigFileName();
+    assert(system(command.c_str()) == 0);
+}
+
+void ZooKeeperQuorumServer::
+stop() {
+    std::string command = root_ + "/bin/zkServer.sh stop " +
+                          getConfigFileName();
+    assert(system(command.c_str()) == 0);
+}
+
+std::string ZooKeeperQuorumServer::
+getMode() {
+    char buf[1024];
+    std::string result;
+    std::string command = root_ + "/bin/zkServer.sh status " +
+                          getConfigFileName();
+    FILE* output = popen(command.c_str(), "r");
+    do {
+        if (fgets(buf, 1024, output) != NULL) {
+            result += buf;
+        }
+    } while (!feof(output));
+    pclose(output);
+    if (result.find("Mode: leader") != std::string::npos) {
+        return "leader";
+    } else if (result.find("Mode: follower") != std::string::npos) {
+        return "follower";
+    } else {
+        printf("%s\n", result.c_str());
+        assert(!"unknown mode");
+    }
+}
+
+bool ZooKeeperQuorumServer::
+isLeader() {
+    return getMode() == "leader";
+}
+
+bool ZooKeeperQuorumServer::
+isFollower() {
+    return getMode() == "follower";
+}
+
+void ZooKeeperQuorumServer::
+createConfigFile() {
+    std::string command = "mkdir -p " + root_ + "/build/test/test-cppunit/conf";
+    assert(system(command.c_str()) == 0);
+    std::ofstream confFile;
+    std::stringstream ss;
+    ss << id_ << ".conf";
+    std::string fileName = root_ + "/build/test/test-cppunit/conf/" + ss.str();
+    confFile.open(fileName.c_str());
+    confFile << "tickTime=2000\n";
+    confFile << "clientPort=" << getClientPort() << "\n";
+    confFile << "initLimit=5\n";
+    confFile << "syncLimit=2\n";
+    confFile << "dataDir=" << getDataDirectory() << "\n";
+    for (int i = 0; i < numServers_; i++) {
+        confFile << getServerString(i) << "\n";
+    }
+    confFile.close();
+}
+
+std::string ZooKeeperQuorumServer::
+getConfigFileName() {
+    std::stringstream ss;
+    ss << id_ << ".conf";
+    return root_ + "/build/test/test-cppunit/conf/" + ss.str();
+}
+
+void ZooKeeperQuorumServer::
+createDataDirectory() {
+    std::string dataDirectory = getDataDirectory();
+    std::string command = "rm -rf " + dataDirectory;
+    assert(system(command.c_str()) == 0);
+    command = "mkdir -p " + dataDirectory;
+    assert(system(command.c_str()) == 0);
+    std::ofstream myidFile;
+    std::string fileName = dataDirectory + "/myid";
+    myidFile.open(fileName.c_str());
+    myidFile << id_ << "\n";
+    myidFile.close();
+    setenv("ZOO_LOG_DIR", dataDirectory.c_str(), true);
+}
+
+std::string ZooKeeperQuorumServer::
+getServerString() {
+    return getServerString(id_);
+}
+
+std::string ZooKeeperQuorumServer::
+getServerString(uint32_t id) {
+    std::stringstream ss;
+    ss << "server." << id << "=localhost:" << SERVER_PORT_BASE + id <<
+          ":" << ELECTION_PORT_BASE + id << ":participant;localhost:" <<
+          CLIENT_PORT_BASE + id;
+    return ss.str();
+}
+
+std::string ZooKeeperQuorumServer::
+getDataDirectory() {
+    std::stringstream ss;
+    ss << "data" << id_;
+    return root_ + "/build/test/test-cppunit/" + ss.str();
+}
+
+std::vector<ZooKeeperQuorumServer*> ZooKeeperQuorumServer::
+getCluster(uint32_t numServers) {
+    std::vector<ZooKeeperQuorumServer*> cluster;
+    for (int i = 0; i < numServers; i++) {
+        cluster.push_back(new ZooKeeperQuorumServer(i, numServers));
+    }
+    return cluster;
+}

Added: zookeeper/trunk/src/c/tests/ZooKeeperQuorumServer.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/ZooKeeperQuorumServer.h?rev=1453693&view=auto
==============================================================================
--- zookeeper/trunk/src/c/tests/ZooKeeperQuorumServer.h (added)
+++ zookeeper/trunk/src/c/tests/ZooKeeperQuorumServer.h Thu Mar  7 06:01:49 2013
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+#ifndef ZOOKEEPER_QUORUM_SERVER_H
+#define ZOOKEEPER_QUORUM_SERVER_H
+
+#include <stdint.h>
+#include <string>
+#include <vector>
+
+class ZooKeeperQuorumServer {
+  public:
+    ~ZooKeeperQuorumServer();
+    static std::vector<ZooKeeperQuorumServer*> getCluster(uint32_t numServers);
+    std::string getHostPort();
+    uint32_t getClientPort();
+    void start();
+    void stop();
+    bool isLeader();
+    bool isFollower();
+    std::string getServerString();
+
+  private:
+    ZooKeeperQuorumServer();
+    ZooKeeperQuorumServer(uint32_t id, uint32_t numServers);
+    ZooKeeperQuorumServer(const ZooKeeperQuorumServer& that);
+    const ZooKeeperQuorumServer& operator=(const ZooKeeperQuorumServer& that);
+    void createConfigFile();
+    std::string getConfigFileName();
+    void createDataDirectory();
+    std::string getDataDirectory();
+    static std::string getServerString(uint32_t id);
+    std::string getMode();
+
+    static const uint32_t SERVER_PORT_BASE = 2000;
+    static const uint32_t ELECTION_PORT_BASE = 3000;
+    static const uint32_t CLIENT_PORT_BASE = 4000;
+
+    uint32_t numServers_;
+    uint32_t id_;
+    std::string root_;
+};
+
+#endif  // ZOOKEEPER_QUORUM_SERVER_H

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/KeeperException.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/KeeperException.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/KeeperException.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/KeeperException.java Thu Mar  7 06:01:49 2013
@@ -103,6 +103,10 @@ public abstract class KeeperException ex
                 return new UnimplementedException();
             case OPERATIONTIMEOUT:
                 return new OperationTimeoutException();
+            case NEWCONFIGNOQUORUM:
+               return new NewConfigNoQuorum();
+            case RECONFIGINPROGRESS:
+               return new ReconfigInProgress();
             case BADARGUMENTS:
                 return new BadArgumentsException();
             case APIERROR:
@@ -277,10 +281,16 @@ public abstract class KeeperException ex
          */
         @Deprecated
         public static final int AuthFailed = -115;
-        /**
-         * This value will be used directly in {@link CODE#SESSIONMOVED}
-         */
-        // public static final int SessionMoved = -118;
+        
+        // This value will be used directly in {@link CODE#SESSIONMOVED}
+        // public static final int SessionMoved = -118;       
+        
+        @Deprecated
+        public static final int NewConfigNoQuorum = -120;
+        
+        @Deprecated
+        public static final int ReconfigInProgress= -121;
+        
     }
 
     /** Codes which represent the various KeeperException
@@ -313,6 +323,11 @@ public abstract class KeeperException ex
         OPERATIONTIMEOUT (OperationTimeout),
         /** Invalid arguments */
         BADARGUMENTS (BadArguments),
+        /** No quorum of new config is connected and up-to-date with the leader of last commmitted config - try 
+         *  invoking reconfiguration after new servers are connected and synced */
+        NEWCONFIGNOQUORUM (NewConfigNoQuorum),
+        /** Another reconfiguration is in progress -- concurrent reconfigs not supported (yet) */
+        RECONFIGINPROGRESS (ReconfigInProgress),
         
         /** API errors.
          * This is never thrown by the server, it shouldn't be used other than
@@ -326,7 +341,8 @@ public abstract class KeeperException ex
         NONODE (NoNode),
         /** Not authenticated */
         NOAUTH (NoAuth),
-        /** Version conflict */
+        /** Version conflict
+            In case of reconfiguration: reconfig requested from config version X but last seen config has a different version Y */
         BADVERSION (BadVersion),
         /** Ephemeral nodes may not have children */
         NOCHILDRENFOREPHEMERALS (NoChildrenForEphemerals),
@@ -390,6 +406,10 @@ public abstract class KeeperException ex
                 return "ConnectionLoss";
             case MARSHALLINGERROR:
                 return "MarshallingError";
+            case NEWCONFIGNOQUORUM:
+               return "NewConfigNoQuorum";
+            case RECONFIGINPROGRESS:
+               return "ReconfigInProgress";
             case UNIMPLEMENTED:
                 return "Unimplemented";
             case OPERATIONTIMEOUT:
@@ -590,6 +610,24 @@ public abstract class KeeperException ex
     }
 
     /**
+     * @see Code#NEWCONFIGNOQUORUM
+     */
+    public static class NewConfigNoQuorum extends KeeperException {
+        public NewConfigNoQuorum() {
+            super(Code.NEWCONFIGNOQUORUM);
+        }
+    }
+    
+    /**
+     * @see Code#RECONFIGINPROGRESS
+     */
+    public static class ReconfigInProgress extends KeeperException {
+        public ReconfigInProgress() {
+            super(Code.RECONFIGINPROGRESS);
+        }
+    }
+    
+    /**
      * @see Code#NOCHILDRENFOREPHEMERALS
      */
     public static class NoChildrenForEphemeralsException extends KeeperException {

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java Thu Mar  7 06:01:49 2013
@@ -25,6 +25,9 @@ import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
 
 public class ZooDefs {
+   
+   final public static String CONFIG_NODE = "/zookeeper/config";
+   
     public interface OpCode {
         public final int notification = 0;
 
@@ -56,6 +59,8 @@ public class ZooDefs {
         
         public final int create2 = 15;
 
+        public final int reconfig = 16;
+        
         public final int auth = 100;
 
         public final int setWatches = 101;
@@ -117,5 +122,5 @@ public class ZooDefs {
 
     final public static String[] opNames = { "notification", "create",
             "delete", "exists", "getData", "setData", "getACL", "setACL",
-            "getChildren", "getChildren2", "getMaxChildren", "setMaxChildren", "ping" };
+            "getChildren", "getChildren2", "getMaxChildren", "setMaxChildren", "ping", "reconfig", "getConfig" };
 }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java Thu Mar  7 06:01:49 2013
@@ -18,28 +18,61 @@
 
 package org.apache.zookeeper;
 
-import org.apache.zookeeper.AsyncCallback.*;
-import org.apache.zookeeper.ClientCnxn.SendThread;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.zookeeper.AsyncCallback.ACLCallback;
+import org.apache.zookeeper.AsyncCallback.Children2Callback;
+import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
+import org.apache.zookeeper.AsyncCallback.Create2Callback;
+import org.apache.zookeeper.AsyncCallback.DataCallback;
+import org.apache.zookeeper.AsyncCallback.MultiCallback;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.apache.zookeeper.OpResult.ErrorResult;
 import org.apache.zookeeper.client.ConnectStringParser;
-import org.apache.zookeeper.client.HostProvider;
 import org.apache.zookeeper.client.StaticHostProvider;
 import org.apache.zookeeper.client.ZooKeeperSaslClient;
 import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.common.StringUtils;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.proto.*;
+import org.apache.zookeeper.proto.Create2Request;
+import org.apache.zookeeper.proto.Create2Response;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.CreateResponse;
+import org.apache.zookeeper.proto.DeleteRequest;
+import org.apache.zookeeper.proto.ExistsRequest;
+import org.apache.zookeeper.proto.GetACLRequest;
+import org.apache.zookeeper.proto.GetACLResponse;
+import org.apache.zookeeper.proto.GetChildren2Request;
+import org.apache.zookeeper.proto.GetChildren2Response;
+import org.apache.zookeeper.proto.GetChildrenRequest;
+import org.apache.zookeeper.proto.GetChildrenResponse;
+import org.apache.zookeeper.proto.GetDataRequest;
+import org.apache.zookeeper.proto.GetDataResponse;
+import org.apache.zookeeper.proto.ReconfigRequest;
+import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.proto.RequestHeader;
+import org.apache.zookeeper.proto.SetACLRequest;
+import org.apache.zookeeper.proto.SetACLResponse;
+import org.apache.zookeeper.proto.SetDataRequest;
+import org.apache.zookeeper.proto.SetDataResponse;
+import org.apache.zookeeper.proto.SyncRequest;
+import org.apache.zookeeper.proto.SyncResponse;
 import org.apache.zookeeper.server.DataTree;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.UnknownHostException;
-import java.util.*;
-
 /**
  * This is the main class of ZooKeeper client library. To use a ZooKeeper
  * service, an application must first instantiate an object of ZooKeeper class.
@@ -1415,6 +1448,181 @@ public class ZooKeeper {
     }
 
     /**
+     * Return the last committed configuration (as known to the server to which the client is connected)
+     * and the stat of the configuration.
+     * <p>
+     * If the watch is non-null and the call is successful (no exception is
+     * thrown), a watch will be left on the configuration node (ZooDefs.CONFIG_NODE). The watch
+     * will be triggered by a successful reconfig operation
+     * <p>
+     * A KeeperException with error code KeeperException.NoNode will be thrown
+     * if the configuration node doesn't exists.
+     *
+     * @param watcher explicit watcher
+     * @param stat the stat of the configuration node ZooDefs.CONFIG_NODE
+     * @return configuration data stored in ZooDefs.CONFIG_NODE
+     * @throws KeeperException If the server signals an error with a non-zero error code
+     * @throws InterruptedException If the server transaction is interrupted.
+     */
+    public byte[] getConfig(Watcher watcher, Stat stat)
+        throws KeeperException, InterruptedException
+     {
+        final String configZnode = ZooDefs.CONFIG_NODE;
+ 
+        // the watch contains the un-chroot path
+        WatchRegistration wcb = null;
+        if (watcher != null) {
+            wcb = new DataWatchRegistration(watcher, configZnode);
+        }
+
+        RequestHeader h = new RequestHeader();
+        h.setType(ZooDefs.OpCode.getData);
+        GetDataRequest request = new GetDataRequest();
+        request.setPath(configZnode);
+        request.setWatch(watcher != null);
+        GetDataResponse response = new GetDataResponse();
+        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
+        if (r.getErr() != 0) {
+            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
+                   configZnode);
+        }
+        if (stat != null) {
+            DataTree.copyStat(response.getStat(), stat);
+        }
+        return response.getData();
+    }
+
+    /**
+     * The asynchronous version of getConfig.
+     *
+     * @see #getConfig(Watcher, Stat)
+     */
+    public void getConfig(Watcher watcher,
+            DataCallback cb, Object ctx)
+    {
+        final String configZnode = ZooDefs.CONFIG_NODE;
+        
+        // the watch contains the un-chroot path
+        WatchRegistration wcb = null;
+        if (watcher != null) {
+            wcb = new DataWatchRegistration(watcher, configZnode);
+        }
+
+        RequestHeader h = new RequestHeader();
+        h.setType(ZooDefs.OpCode.getData);
+        GetDataRequest request = new GetDataRequest();
+        request.setPath(configZnode);
+        request.setWatch(watcher != null);
+        GetDataResponse response = new GetDataResponse();
+        cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
+               configZnode, configZnode, ctx, wcb);
+    }
+
+    
+    /**
+     * Return the last committed configuration (as known to the server to which the client is connected)
+     * and the stat of the configuration.
+     * <p>
+     * If the watch is true and the call is successful (no exception is
+     * thrown), a watch will be left on the configuration node (ZooDefs.CONFIG_NODE). The watch
+     * will be triggered by a successful reconfig operation
+     * <p>
+     * A KeeperException with error code KeeperException.NoNode will be thrown
+     * if no node with the given path exists.
+     *
+     * @param watch whether need to watch this node
+     * @param stat the stat of the configuration node ZooDefs.CONFIG_NODE
+     * @return configuration data stored in ZooDefs.CONFIG_NODE
+     * @throws KeeperException If the server signals an error with a non-zero error code
+     * @throws InterruptedException If the server transaction is interrupted.
+     */
+    public byte[] getConfig(boolean watch, Stat stat)
+            throws KeeperException, InterruptedException {
+        return getConfig(watch ? watchManager.defaultWatcher : null, stat);
+    }
+ 
+    /**
+     * The Asynchronous version of getConfig. 
+     * 
+     * @see #getData(String, boolean, Stat)
+     */
+    public void getConfig(boolean watch, DataCallback cb, Object ctx) {
+        getConfig(watch ? watchManager.defaultWatcher : null, cb, ctx);
+    }
+    
+    /**
+     * Reconfigure - add/remove servers. Return the new configuration.
+     * @param joiningServers
+     *                a comma separated list of servers being added (incremental reconfiguration)
+     * @param leavingServers
+     *                a comma separated list of servers being removed (incremental reconfiguration)
+     * @param newMembers
+     *                a comma separated list of new membership (non-incremental reconfiguration)
+     * @param fromConfig
+     *                version of the current configuration (optional - causes reconfiguration to throw an exception if configuration is no longer current)
+     * @return new configuration
+     * @throws InterruptedException If the server transaction is interrupted.
+     * @throws KeeperException If the server signals an error with a non-zero error code.     
+     */
+    public byte[] reconfig(String joiningServers, String leavingServers, String newMembers, long fromConfig, Stat stat) throws KeeperException, InterruptedException
+    {
+        RequestHeader h = new RequestHeader();
+        h.setType(ZooDefs.OpCode.reconfig);       
+        ReconfigRequest request = new ReconfigRequest(joiningServers, leavingServers, newMembers, fromConfig);        
+        GetDataResponse response = new GetDataResponse();       
+        ReplyHeader r = cnxn.submitRequest(h, request, response, null);
+        if (r.getErr() != 0) {
+            throw KeeperException.create(KeeperException.Code.get(r.getErr()), "");
+        }
+        DataTree.copyStat(response.getStat(), stat);
+        return response.getData();
+    }
+
+    /**
+     * Convenience wrapper around reconfig that takes Lists of strings instead of comma-separated servers.
+     *
+     * @see #reconfig
+     *
+     */
+    public byte[] reconfig(List<String> joiningServers, List<String> leavingServers, List<String> newMembers, long fromConfig, Stat stat) throws KeeperException, InterruptedException
+    {
+        return reconfig(StringUtils.joinStrings(joiningServers, ","), 
+        		StringUtils.joinStrings(leavingServers, ","), 
+        		StringUtils.joinStrings(newMembers, ","), 
+        		fromConfig, stat);
+    }
+
+    /**
+     * The Asynchronous version of reconfig. 
+     *
+     * @see #reconfig
+     *      
+     **/
+    public void reconfig(String joiningServers, String leavingServers, String newMembers, long fromConfig, DataCallback cb, Object ctx) throws KeeperException, InterruptedException
+    {
+        RequestHeader h = new RequestHeader();
+        h.setType(ZooDefs.OpCode.reconfig);       
+        ReconfigRequest request = new ReconfigRequest(joiningServers, leavingServers, newMembers, fromConfig);
+        GetDataResponse response = new GetDataResponse();
+        cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
+               ZooDefs.CONFIG_NODE, ZooDefs.CONFIG_NODE, ctx, null);
+    }
+ 
+    /**
+     * Convenience wrapper around asynchronous reconfig that takes Lists of strings instead of comma-separated servers.
+     *
+     * @see #reconfig
+     *
+     */
+    public void reconfig(List<String> joiningServers, List<String> leavingServers, List<String> newMembers, long fromConfig, DataCallback cb, Object ctx) throws KeeperException, InterruptedException
+    {
+        reconfig(StringUtils.joinStrings(joiningServers, ","), 
+        		StringUtils.joinStrings(leavingServers, ","), 
+        		StringUtils.joinStrings(newMembers, ","), 
+        		fromConfig, cb, ctx);
+    }
+   
+    /**
      * Set the data for the node of the given path if such a node exists and the
      * given version matches the version of the node (if the given version is
      * -1, it matches any node's versions). Return the stat of the node.

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java Thu Mar  7 06:01:49 2013
@@ -49,9 +49,11 @@ import org.apache.zookeeper.cli.DeleteAl
 import org.apache.zookeeper.cli.DeleteCommand;
 import org.apache.zookeeper.cli.GetAclCommand;
 import org.apache.zookeeper.cli.GetCommand;
+import org.apache.zookeeper.cli.GetConfigCommand;
 import org.apache.zookeeper.cli.ListQuotaCommand;
 import org.apache.zookeeper.cli.Ls2Command;
 import org.apache.zookeeper.cli.LsCommand;
+import org.apache.zookeeper.cli.ReconfigCommand;
 import org.apache.zookeeper.cli.SetAclCommand;
 import org.apache.zookeeper.cli.SetCommand;
 import org.apache.zookeeper.cli.SetQuotaCommand;
@@ -105,6 +107,8 @@ public class ZooKeeperMain {
         new ListQuotaCommand().addToMap(commandMapCli);
         new DelQuotaCommand().addToMap(commandMapCli);
         new AddAuthCommand().addToMap(commandMapCli);
+        new ReconfigCommand().addToMap(commandMapCli);
+        new GetConfigCommand().addToMap(commandMapCli);
         
         // add all to commandMap
         for (Entry<String, CliCommand> entry : commandMapCli.entrySet()) {
@@ -583,6 +587,13 @@ public class ZooKeeperMain {
             System.err.println("Arguments are not valid : "+e.getPath());
         }catch (KeeperException.BadVersionException e) {
             System.err.println("version No is not valid : "+e.getPath());
+        }catch (KeeperException.ReconfigInProgress e) {
+            System.err.println("Another reconfiguration is in progress -- concurrent " +
+                   "reconfigs not supported (yet)");
+        }catch (KeeperException.NewConfigNoQuorum e) {
+            System.err.println("No quorum of new config is connected and " +
+                   "up-to-date with the leader of last commmitted config - try invoking reconfiguration after " +
+                   "new servers are connected and synced");
         }
         return false;
     }

Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/GetConfigCommand.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/GetConfigCommand.java?rev=1453693&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/GetConfigCommand.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/GetConfigCommand.java Thu Mar  7 06:01:49 2013
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.cli;
+
+import org.apache.commons.cli.*;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.util.ConfigUtils;
+
+/**
+ * get command for cli
+ */
+public class GetConfigCommand extends CliCommand {
+
+    private static Options options = new Options();
+    private String args[];
+    private CommandLine cl;
+
+    {
+        options.addOption("s", false, "stats");
+        options.addOption("w", false, "watch");
+        options.addOption("c", false, "client connection string");
+    }
+
+    public GetConfigCommand() {
+        super("config", "[-c] [-w] [-s]");
+    }
+
+    @Override
+    public CliCommand parse(String[] cmdArgs) throws ParseException {
+
+        Parser parser = new PosixParser();
+        cl = parser.parse(options, cmdArgs);
+        args = cl.getArgs();
+        if (args.length < 1) {
+            throw new ParseException(getUsageStr());
+        }
+
+        return this;
+    }
+
+    @Override
+    public boolean exec() throws KeeperException, InterruptedException {
+        boolean watch = cl.hasOption("w");        
+        Stat stat = new Stat();
+        byte data[] = zk.getConfig(watch, stat);
+        data = (data == null) ? "null".getBytes() : data;
+        if (cl.hasOption("c")) {
+            out.println(ConfigUtils.getClientConfigStr(new String(data)));
+        } else {
+            out.println(new String(data));
+        }
+        
+        if (cl.hasOption("s")) {
+            new StatPrinter(out).print(stat);
+        }                
+        
+        return watch;
+    }
+}

Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/ReconfigCommand.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/ReconfigCommand.java?rev=1453693&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/ReconfigCommand.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/ReconfigCommand.java Thu Mar  7 06:01:49 2013
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.cli;
+
+import java.io.FileInputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.cli.*;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+
+/**
+ * reconfig command for cli
+ */
+public class ReconfigCommand extends CliCommand {
+
+    private static Options options = new Options();
+
+    /* joining - comma separated list of server config strings for servers to be added to the ensemble.
+     * Each entry is identical in syntax as it would appear in a configuration file. Only used for 
+     * incremental reconfigurations.
+     */
+    private String joining;
+
+    /* leaving - comma separated list of server IDs to be removed from the ensemble. Only used for
+     * incremental reconfigurations.
+     */
+    private String leaving;
+
+    /* members - comma separated list of new membership information (e.g., contents of a membership
+     * configuration file) - for use only with a non-incremental reconfiguration. This may be specified
+     * manually via the -members flag or it will automatically be filled in by reading the contents
+     * of an actual configuration file using the -file flag.
+     */
+    private String members;
+
+    /* version - version of config from which we want to reconfigure - if current config is different
+     * reconfiguration will fail. Should be ommitted from the CLI to disable this option.
+     */
+    long version = -1;
+    private CommandLine cl;
+
+    {
+        options.addOption("s", false, "stats");
+        options.addOption("v", true, "required current config version");
+        options.addOption("file", true, "path of config file to parse for membership");
+        options.addOption("members", true, "comma-separated list of config strings for " +
+        		"non-incremental reconfig");
+        options.addOption("add", true, "comma-separated list of config strings for " +
+        		"new servers");
+        options.addOption("remove", true, "comma-separated list of server IDs to remove");
+    }
+
+    public ReconfigCommand() {
+        super("reconfig", "[-s] " +
+        		"[-v version] " +
+        		"[[-file path] | " +
+        		"[-members serverID=host:port1:port2;port3[,...]*]] | " +
+        		"[-add serverId=host:port1:port2;port3[,...]]* " +
+        		"[-remove serverId[,...]*]");
+    }
+
+    @Override
+    public CliCommand parse(String[] cmdArgs) throws ParseException {
+        joining = null;
+        leaving = null;
+        members = null;
+        Parser parser = new PosixParser();
+        cl = parser.parse(options, cmdArgs);
+        if (!cl.hasOption("file") && !cl.hasOption("add") && !cl.hasOption("remove")) {
+            throw new ParseException(getUsageStr());
+        }
+        if (cl.hasOption("v")) {
+            try{ 
+                version = Long.parseLong(cl.getOptionValue("v"), 16);
+            } catch (NumberFormatException e){
+                throw new ParseException("-v must be followed by a long (configuration version)");
+            }
+        } else {
+            version = -1;
+        }
+
+        // Simple error checking for conflicting modes
+        if ((cl.hasOption("file") || cl.hasOption("members")) && (cl.hasOption("add") || cl.hasOption("remove"))) {
+            throw new ParseException("Can't use -file or -members together with -add or -remove (mixing incremental" +
+            		" and non-incremental modes is not allowed)");
+        }
+        if (cl.hasOption("file") && cl.hasOption("members"))
+        {
+            throw new ParseException("Can't use -file and -members together (conflicting non-incremental modes)");
+        }
+
+        // Set the joining/leaving/members values based on the mode we're in
+        if (cl.hasOption("add")) {
+           joining = cl.getOptionValue("add").toLowerCase();
+        }
+        if (cl.hasOption("remove")) {
+           leaving = cl.getOptionValue("remove").toLowerCase();
+        }
+        if (cl.hasOption("members")) {
+           members = cl.getOptionValue("members").toLowerCase();
+        }
+        if (cl.hasOption("file")) {
+            try {           
+                FileInputStream inConfig = new FileInputStream(cl.getOptionValue("file"));
+                QuorumPeerConfig config = new QuorumPeerConfig();                       
+                Properties dynamicCfg = new Properties();
+                try {
+                    dynamicCfg.load(inConfig);
+                } finally {
+                    inConfig.close();
+                }
+                //check that membership makes sense; leader will make these checks again
+                //don't check for leader election ports since 
+                //client doesn't know what leader election alg is used
+                config.parseDynamicConfig(dynamicCfg, 0, true);                               
+                members = config.getQuorumVerifier().toString();                    
+            } catch (Exception e) {
+                throw new ParseException("Error processing " + cl.getOptionValue("file") + e.getMessage());                
+            } 
+        }
+        return this;
+    }
+
+    @Override
+    public boolean exec() throws KeeperException, InterruptedException {
+        try {
+            Stat stat = new Stat();
+            byte[] curConfig = zk.reconfig(joining,
+                    leaving, members, version, stat);
+            out.println("Committed new configuration:\n" + new String(curConfig));
+            
+            if (cl.hasOption("s")) {
+                new StatPrinter(out).print(stat);
+            }
+        } catch (KeeperException ex) {
+            err.println(ex.getMessage());
+        }
+        return false;
+    }
+}

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/common/StringUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/common/StringUtils.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/common/StringUtils.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/common/StringUtils.java Thu Mar  7 06:01:49 2013
@@ -41,4 +41,23 @@ public class StringUtils {
         }
         return Collections.unmodifiableList(results);
     }
+    
+    /**
+     * This method takes a List<String> and a delimiter and joins the strings
+     * into a single string, where the original strings are separated using 
+     * the given delimiter.
+     *
+     */ 
+    public static String joinStrings(List<String> list, String delim)
+    {
+        if (list == null)
+            return null;
+
+       StringBuilder builder = new StringBuilder(list.get(0));
+        for (String s : list.subList(1, list.size())) {
+            builder.append(delim).append(s);
+        }
+
+        return builder.toString();
+    }
 }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java Thu Mar  7 06:01:49 2013
@@ -43,6 +43,7 @@ import org.apache.zookeeper.Quotas;
 import org.apache.zookeeper.StatsTrack;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.Watcher.Event;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -107,6 +108,16 @@ public class DataTree {
             .substring(procZookeeper.length() + 1);
 
     /**
+     * the zookeeper config node that acts as the config management node for
+     * zookeeper
+     */
+    private static final String configZookeeper = ZooDefs.CONFIG_NODE;
+
+    /** this will be the string thats stored as a child of /zookeeper */
+    private static final String configChildZookeeper = configZookeeper
+            .substring(procZookeeper.length() + 1);
+    
+    /**
      * the path trie that keeps track fo the quota nodes in this datatree
      */
     private final PathTrie pTrie = new PathTrie();
@@ -254,6 +265,13 @@ public class DataTree {
      */
     private final DataNode quotaDataNode = new DataNode(new byte[0], -1L, new StatPersisted());
 
+    /**
+     * create a /zookeeper/config node for maintaining the configuration (membership and quorum system) info for
+     * zookeeper
+     */
+    private DataNode configDataNode = new DataNode(new byte[0], -1L, new StatPersisted());
+
+    
     public DataTree() {
         /* Rather than fight it, let root have an alias */
         nodes.put("", root);
@@ -265,8 +283,20 @@ public class DataTree {
 
         procDataNode.addChild(quotaChildZookeeper);
         nodes.put(quotaZookeeper, quotaDataNode);
+        
+        addConfigNode();
     }
 
+     public void addConfigNode() {
+    	 DataNode zookeeperZnode = nodes.get(procZookeeper);
+         if (zookeeperZnode!=null) { // should always be the case
+        	 zookeeperZnode.addChild(configChildZookeeper);
+         } else {
+        	 LOG.error("There's no /zookeeper znode - this should never happen");
+         }
+         nodes.put(configZookeeper, configDataNode);   
+     }
+
     /**
      * is the path one of the special paths owned by zookeeper.
      *
@@ -276,7 +306,7 @@ public class DataTree {
      */
     boolean isSpecialPath(String path) {
         if (rootZookeeper.equals(path) || procZookeeper.equals(path)
-                || quotaZookeeper.equals(path)) {
+                || quotaZookeeper.equals(path) || configZookeeper.equals(path)) {
             return true;
         }
         return false;
@@ -798,6 +828,7 @@ public class DataTree {
                     rc.path = deleteTxn.getPath();
                     deleteNode(deleteTxn.getPath(), header.getZxid());
                     break;
+                case OpCode.reconfig:
                 case OpCode.setData:
                     SetDataTxn setDataTxn = (SetDataTxn) txn;
                     rc.path = setDataTxn.getPath();

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java Thu Mar  7 06:01:49 2013
@@ -53,7 +53,7 @@ import org.apache.zookeeper.proto.SyncRe
 import org.apache.zookeeper.proto.SyncResponse;
 import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
 import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
-import org.apache.zookeeper.txn.CreateSessionTxn;
+import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer;
 import org.apache.zookeeper.txn.ErrorTxn;
 import org.apache.zookeeper.txn.TxnHeader;
 
@@ -237,6 +237,12 @@ public class FinalRequestProcessor imple
                 rsp = new SetDataResponse(rc.stat);
                 err = Code.get(rc.err);
                 break;
+            }           
+            case OpCode.reconfig: {
+                lastOp = "RECO";               
+                rsp = new GetDataResponse(((QuorumZooKeeperServer)zks).self.getQuorumVerifier().toString().getBytes(), rc.stat);
+                err = Code.get(rc.err);
+                break;
             }
             case OpCode.setACL: {
                 lastOp = "SETA";