You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2012/11/17 15:03:06 UTC

svn commit: r1410731 [1/2] - in /zookeeper/trunk: ./ src/c/ src/c/include/ src/c/src/ src/c/tests/ src/docs/src/documentation/content/xdocs/ src/java/main/org/apache/zookeeper/ src/java/main/org/apache/zookeeper/client/ src/java/test/org/apache/zookeep...

Author: fpj
Date: Sat Nov 17 14:03:03 2012
New Revision: 1410731

URL: http://svn.apache.org/viewvc?rev=1410731&view=rev
Log:
ZOOKEEPER-1355. Add zk.updateServerList(newServerList) (Alex Shraer, Marshall McMullen via fpj)


Added:
    zookeeper/trunk/src/c/src/addrvec.c
    zookeeper/trunk/src/c/src/addrvec.h
    zookeeper/trunk/src/c/tests/TestReconfig.cc
Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/c/Makefile.am
    zookeeper/trunk/src/c/include/zookeeper.h
    zookeeper/trunk/src/c/src/mt_adaptor.c
    zookeeper/trunk/src/c/src/st_adaptor.c
    zookeeper/trunk/src/c/src/zk_adaptor.h
    zookeeper/trunk/src/c/src/zookeeper.c
    zookeeper/trunk/src/c/tests/TestZookeeperClose.cc
    zookeeper/trunk/src/c/tests/TestZookeeperInit.cc
    zookeeper/trunk/src/c/tests/ZKMocks.cc
    zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperProgrammers.xml
    zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/client/HostProvider.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/client/StaticHostProvider.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StaticHostProviderTest.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1410731&r1=1410730&r2=1410731&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Sat Nov 17 14:03:03 2012
@@ -6,6 +6,9 @@ BUGFIXES:
 
 Backward compatible changes:
 
+NEW FEATURES:
+  ZOOKEEPER-1355. Add zk.updateServerList(newServerList) (Alex Shraer, Marshall McMullen via fpj)
+
 BUGFIXES:
 
   ZOOKEEPER-786. Exception in ZooKeeper.toString

Modified: zookeeper/trunk/src/c/Makefile.am
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/Makefile.am?rev=1410731&r1=1410730&r2=1410731&view=diff
==============================================================================
--- zookeeper/trunk/src/c/Makefile.am (original)
+++ zookeeper/trunk/src/c/Makefile.am Sat Nov 17 14:03:03 2012
@@ -19,7 +19,8 @@ libhashtable_la_SOURCES = $(HASHTABLE_SR
 COMMON_SRC = src/zookeeper.c include/zookeeper.h include/zookeeper_version.h include/zookeeper_log.h\
     src/recordio.c include/recordio.h include/proto.h \
     src/zk_adaptor.h generated/zookeeper.jute.c \
-    src/zookeeper_log.h src/zk_log.c src/zk_hashtable.h src/zk_hashtable.c
+    src/zookeeper_log.h src/zk_log.c src/zk_hashtable.h src/zk_hashtable.c \
+	src/addrvec.h src/addrvec.c
 
 # These are the symbols (classes, mostly) we want to export from our library.
 EXPORT_SYMBOLS = '(zoo_|zookeeper_|zhandle|Z|format_log_message|log_message|logLevel|deallocate_|zerror|is_unrecoverable)'
@@ -70,13 +71,26 @@ endif
 EXTRA_DIST+=$(wildcard ${srcdir}/tests/*.cc) $(wildcard ${srcdir}/tests/*.h) \
     ${srcdir}/tests/wrappers.opt ${srcdir}/tests/wrappers-mt.opt
 
-TEST_SOURCES = tests/TestDriver.cc tests/LibCMocks.cc tests/LibCSymTable.cc \
-    tests/MocksBase.cc  tests/ZKMocks.cc tests/Util.cc tests/ThreadingUtil.cc \
+# These tests are ordered in a logical manner such that each builds upon basic
+# functionality tested in prior tests. e.g. the most basic functionality is
+# tested in TestZookeeperInit and TestZookeeperClose and as such should be tested
+# first as a foundation with more complex test suites to follow.
+TEST_SOURCES = \
+	tests/TestDriver.cc \
+	tests/LibCMocks.cc \
+	tests/LibCSymTable.cc \
+	tests/MocksBase.cc \
+	tests/ZKMocks.cc \
+	tests/Util.cc \
+	tests/ThreadingUtil.cc \
+	tests/TestZookeeperInit.cc \
+	tests/TestZookeeperClose.cc \
+	tests/TestReconfig.cc \
     tests/TestClientRetry.cc \
-    tests/TestOperations.cc tests/TestZookeeperInit.cc \
-    tests/TestZookeeperClose.cc tests/TestClient.cc \
-    tests/TestMulti.cc tests/TestWatchers.cc
-
+	tests/TestOperations.cc \
+	tests/TestMulti.cc \
+	tests/TestClient.cc \
+	tests/TestWatchers.cc
 
 SYMBOL_WRAPPERS=$(shell cat ${srcdir}/tests/wrappers.opt)
 

Modified: zookeeper/trunk/src/c/include/zookeeper.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/include/zookeeper.h?rev=1410731&r1=1410730&r2=1410731&view=diff
==============================================================================
--- zookeeper/trunk/src/c/include/zookeeper.h (original)
+++ zookeeper/trunk/src/c/include/zookeeper.h Sat Nov 17 14:03:03 2012
@@ -184,6 +184,7 @@ extern ZOOAPI const int ZOO_AUTH_FAILED_
 extern ZOOAPI const int ZOO_CONNECTING_STATE;
 extern ZOOAPI const int ZOO_ASSOCIATING_STATE;
 extern ZOOAPI const int ZOO_CONNECTED_STATE;
+extern ZOOAPI const int ZOO_NOTCONNECTED_STATE;
 // @}
 
 /**
@@ -450,6 +451,63 @@ ZOOAPI zhandle_t *zookeeper_init(const c
   int recv_timeout, const clientid_t *clientid, void *context, int flags);
 
 /**
+ * \brief update the list of servers this client will connect to.
+ * 
+ * This method allows a client to update the connection string by providing
+ * a new comma separated list of host:port pairs, each corresponding to a
+ * ZooKeeper server. 
+ * 
+ * This function invokes a probabilistic load-balancing algorithm which may cause
+ * the client to disconnect from its current host to achieve expected uniform 
+ * connections per server in the new list. In case the current host to which the
+ * client is connected is not in the new list this call will always cause the
+ * connection to be dropped. Otherwise, the decision is based on whether the 
+ * number of servers has increased or decreased and by how much.
+ * 
+ * If the connection is dropped, the client moves to a special "reconfig" mode
+ * where he chooses a new server to connect to using the probabilistic algorithm.
+ * After finding a server or exhaustively trying all the servers in the new list,
+ * the client moves back to the normal mode of operation where it will pick an
+ * arbitrary server from the 'host' string.
+ * 
+ * See {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1355} for the 
+ * protocol and its evaluation,
+ * 
+ * \param host comma separated host:port pairs, each corresponding to a zk
+ *   server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZSYSTEMERROR -- a system (OS) error occured; it's worth checking errno to get details
+ */
+ZOOAPI int zoo_set_servers(zhandle_t *zh, const char *hosts);
+
+/**
+ * \brief cycle to the next server on the next connection attempt.
+ * 
+ * Note: typically this method should NOT be used outside of testing.
+ *
+ * This method allows a client to cycle through the list of servers in it's
+ * connection pool to be used on the next connection attempt. This function does
+ * not actually trigger a connection or state change in any way. Its purpose is
+ * to allow testing changing servers on the fly and the probabilistic load
+ * balancing algorithm.
+ */
+ZOOAPI void zoo_cycle_next_server(zhandle_t *zh);
+
+/**
+ * \brief get current host:port this client is connecting/connected to.
+ * 
+ * Note: typically this method should NOT be used outside of testing.
+ *
+ * This method allows a client to get the current host:port that this client
+ * is either in the process of connecting to or is currently connected to. This
+ * is mainly used for testing purposes but might also come in handy as a general
+ * purpose tool to be used by other clients.
+ */
+ZOOAPI const char* zoo_get_current_server(zhandle_t* zh);
+
+/**
  * \brief close the zookeeper handle and free up any resources.
  * 
  * After this call, the client session will no longer be valid. The function

Added: zookeeper/trunk/src/c/src/addrvec.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/addrvec.c?rev=1410731&view=auto
==============================================================================
--- zookeeper/trunk/src/c/src/addrvec.c (added)
+++ zookeeper/trunk/src/c/src/addrvec.c Sat Nov 17 14:03:03 2012
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <errno.h>
+
+#include "addrvec.h"
+
+#define ADDRVEC_DEFAULT_GROW_AMOUNT 16
+
+void addrvec_init(addrvec_t *avec)
+{
+    assert(avec);
+    avec->next = 0;
+    avec->count = 0;
+    avec->capacity = 0;
+    avec->data = NULL;
+}
+
+void addrvec_free(addrvec_t *avec)
+{
+    if (avec == NULL)
+    {
+        return;
+    }
+
+    avec->next = 0;
+    avec->count = 0;
+    avec->capacity = 0;
+    if (avec->data) {
+        free(avec->data);
+        avec->data = NULL;
+    }
+}
+
+int addrvec_alloc(addrvec_t *avec)
+{
+    addrvec_init(avec);
+    return addrvec_grow_default(avec);
+}
+
+int addrvec_alloc_capacity(addrvec_t* avec, uint32_t capacity)
+{
+    addrvec_init(avec);
+    return addrvec_grow(avec, capacity);
+}
+
+int addrvec_grow(addrvec_t *avec, uint32_t grow_amount)
+{
+    assert(avec);
+
+    if (grow_amount == 0)
+    {
+        return 0;
+    }
+
+    // Save off old data and capacity in case there is a realloc failure
+    unsigned int old_capacity = avec->capacity;
+    struct sockaddr_storage *old_data = avec->data;
+
+    avec->capacity += grow_amount;
+    avec->data = realloc(avec->data, sizeof(*avec->data) * avec->capacity);
+    if (avec->data == NULL)
+    {
+        avec->capacity = old_capacity;
+        avec->data = old_data;
+        errno = ENOMEM;
+        return 1;
+    }
+
+    return 0;
+}
+
+int addrvec_grow_default(addrvec_t *avec)
+{
+    return addrvec_grow(avec, ADDRVEC_DEFAULT_GROW_AMOUNT);
+}
+
+static int addrvec_grow_if_full(addrvec_t *avec)
+{
+    assert(avec);
+    if (avec->count == avec->capacity)
+    {
+        int rc = addrvec_grow_default(avec);
+        if (rc != 0)
+        {
+            return rc;
+        }
+    }
+
+    return 0;
+}
+
+int addrvec_contains(const addrvec_t *avec, const struct sockaddr_storage *addr)
+{
+    if (!avec || !addr)
+    { 
+        return 0;
+    }
+
+    int i = 0;
+    for (i = 0; i < avec->count; i++)
+    {
+        if(memcmp(&avec->data[i], addr, INET_ADDRSTRLEN) == 0)
+            return 1;
+    }
+
+    return 0;
+}
+
+int addrvec_append(addrvec_t *avec, const struct sockaddr_storage *addr)
+{
+    assert(avec);
+    assert(addr);
+
+    int rc = addrvec_grow_if_full(avec);
+    if (rc != 0)
+    {
+        return rc;
+    }
+
+    // Copy addrinfo into address list
+    memcpy(avec->data + avec->count, addr, sizeof(*addr));
+    ++avec->count;
+
+    return 0;
+}
+
+int addrvec_append_addrinfo(addrvec_t *avec, const struct addrinfo *addrinfo)
+{
+    assert(avec);
+    assert(addrinfo);
+
+    int rc = addrvec_grow_if_full(avec);
+    if (rc != 0)
+    {
+        return rc;
+    }
+
+    // Copy addrinfo into address list
+    memcpy(avec->data + avec->count, addrinfo->ai_addr, addrinfo->ai_addrlen);
+    ++avec->count;
+
+    return 0;
+}
+
+void addrvec_shuffle(addrvec_t *avec)
+{
+    int i = 0;
+    for (i = avec->count - 1; i > 0; --i) {
+        long int j = random()%(i+1);
+        if (i != j) {
+            struct sockaddr_storage t = avec->data[i];
+            avec->data[i] = avec->data[j];
+            avec->data[j] = t;
+        }
+    }
+}
+
+int addrvec_hasnext(const addrvec_t *avec)
+{
+    return avec->count > 0 && (avec->next < avec->count);
+}
+
+int addrvec_atend(const addrvec_t *avec)
+{
+    return avec->count > 0 && avec->next >= avec->count;
+}
+
+void addrvec_next(addrvec_t *avec, struct sockaddr_storage *next)
+{
+    // If we're at the end of the list, then reset index to start
+    if (addrvec_atend(avec))
+    {
+        avec->next = 0;
+    }
+
+    if (!addrvec_hasnext(avec))
+    {
+        next = NULL;
+        return;
+    }
+
+    *next = avec->data[avec->next++];
+}
+
+int addrvec_eq(const addrvec_t *a1, const addrvec_t *a2)
+{
+    if (a1->count != a2->count)
+    {
+        return 0;
+    }
+
+    int i;
+    for (i = 0; i < a1->count; ++i)
+    {
+        if (!addrvec_contains(a2, &a1->data[i]))
+            return 0;
+    }
+
+    return 1;
+}

Added: zookeeper/trunk/src/c/src/addrvec.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/addrvec.h?rev=1410731&view=auto
==============================================================================
--- zookeeper/trunk/src/c/src/addrvec.h (added)
+++ zookeeper/trunk/src/c/src/addrvec.h Sat Nov 17 14:03:03 2012
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ADDRVEC_H_
+#define ADDRVEC_H_
+
+#include <inttypes.h>
+
+#ifndef WIN32
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#endif
+
+/**
+ * This structure represents a list of addresses. It stores the count of the
+ * number of elements that have been inserted via calls to addrvec_append and
+ * addrvec_append_addrinfo. It also has a capacity field for the number of 
+ * addresses it has the ability to hold without needing to be enlarged.
+ */
+typedef struct _addrvec {
+    unsigned int next;                        // next index to use
+    unsigned int count;                       // number of addresses in this list
+    unsigned int capacity;                    // number of address this list can hold
+    struct sockaddr_storage *data;   // list of addresses
+} addrvec_t;
+
+/**
+ * Initialize an addrvec by clearing out all its state.
+ */
+void addrvec_init(addrvec_t *avec);
+
+/**
+ * Free any memory used internally by an addrvec
+ */
+void addrvec_free(addrvec_t *avec);
+
+/**
+ * Allocate an addrvec with a default capacity (16)
+ */
+int addrvec_alloc(addrvec_t *avec);
+
+/**
+ * Allocates an addrvec with a specified capacity
+ */
+int addrvec_alloc_capacity(addrvec_t *avec, uint32_t capacity);
+
+/**
+ * Grow an addrvec by the specified amount. This will increase the capacity
+ * of the vector and not the contents.
+ */
+int addrvec_grow(addrvec_t *avec, uint32_t grow_amount);
+
+/**
+ * Similar to addrvec_grow but uses a default growth amount of 16.
+ */
+int addrvec_grow_default(addrvec_t *avec);
+
+/**
+ * Check if an addrvec contains the specificed sockaddr_storage value.
+ * \returns 1 if it contains the value and 0 otherwise.
+ */
+int addrvec_contains(const addrvec_t *avec, const struct sockaddr_storage *addr);
+
+/**
+ * Append the given sockaddr_storage pointer into the addrvec. The contents of
+ * the given 'addr' are copied into the addrvec via memcpy.
+ */
+int addrvec_append(addrvec_t *avec, const struct sockaddr_storage *addr);
+
+/**
+ * Append the given addrinfo pointer into the addrvec. The contents of the given
+ * 'addrinfo' are copied into the addrvec via memcpy.
+ */
+int addrvec_append_addrinfo(addrvec_t *avec, const struct addrinfo *addrinfo);
+
+/**
+ * Shuffle the addrvec so that it's internal list of addresses are randomized.
+ * Uses random() and assumes it has been properly seeded.
+ */
+void addrvec_shuffle(addrvec_t *avec);
+
+/**
+ * Determine if the addrvec has a next element (e.g. it's safe to call addrvec_next)
+ * 
+ * \returns 1 if it has a next element and 0 otherwise
+ */
+int addrvec_hasnext(const addrvec_t *avec);
+
+/**
+ * Determine if the addrvec is at the end or not. Specifically, this means a
+ * subsequent call to addrvec_next will loop around to the start again.
+ */
+int addrvec_atend(const addrvec_t *avec);
+
+/**
+ * Get the next entry from the addrvec and update the associated index. 
+ * 
+ * If the current index points at (or after) the last element in the vector then
+ * it will loop back around and start at the beginning of the list.
+ */
+void addrvec_next(addrvec_t *avec, struct sockaddr_storage *next);
+
+/**
+ * Compare two addrvecs for equality. 
+ * 
+ * \returns 1 if the contents of the two lists are identical and and 0 otherwise.
+ */
+int addrvec_eq(const addrvec_t *a1, const addrvec_t *a2);
+
+#endif // ADDRVEC_H
+
+
+

Modified: zookeeper/trunk/src/c/src/mt_adaptor.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/mt_adaptor.c?rev=1410731&r1=1410730&r2=1410731&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/mt_adaptor.c (original)
+++ zookeeper/trunk/src/c/src/mt_adaptor.c Sat Nov 17 14:03:03 2012
@@ -255,6 +255,7 @@ int adaptor_init(zhandle_t *zh)
     zh->adaptor_priv = adaptor_threads;
     pthread_mutex_init(&zh->to_process.lock,0);
     pthread_mutex_init(&adaptor_threads->zh_lock,0);
+    pthread_mutex_init(&adaptor_threads->reconfig_lock,0);
     // to_send must be recursive mutex    
     pthread_mutexattr_init(&recursive_mx_attr);
     pthread_mutexattr_settype(&recursive_mx_attr, PTHREAD_MUTEX_RECURSIVE);
@@ -515,6 +516,19 @@ __attribute__((constructor)) int32_t get
     return fetch_and_add(&xid,1);
 }
 
+void lock_reconfig(struct _zhandle *zh)
+{
+    struct adaptor_threads *adaptor = zh->adaptor_priv;
+    if(adaptor)
+        pthread_mutex_lock(&adaptor->reconfig_lock);
+}
+void unlock_reconfig(struct _zhandle *zh)
+{
+    struct adaptor_threads *adaptor = zh->adaptor_priv;
+    if(adaptor)
+        pthread_mutex_unlock(&adaptor->reconfig_lock);
+}
+
 void enter_critical(zhandle_t* zh)
 {
     struct adaptor_threads *adaptor = zh->adaptor_priv;

Modified: zookeeper/trunk/src/c/src/st_adaptor.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/st_adaptor.c?rev=1410731&r1=1410730&r2=1410731&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/st_adaptor.c (original)
+++ zookeeper/trunk/src/c/src/st_adaptor.c Sat Nov 17 14:03:03 2012
@@ -95,5 +95,9 @@ int32_t get_xid()
     }
     return xid++;
 }
+
+void lock_reconfig(struct _zhandle *zh){}
+void unlock_reconfig(struct _zhandle *zh){}
+
 void enter_critical(zhandle_t* zh){}
 void leave_critical(zhandle_t* zh){}

Modified: zookeeper/trunk/src/c/src/zk_adaptor.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/zk_adaptor.h?rev=1410731&r1=1410730&r2=1410731&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/zk_adaptor.h (original)
+++ zookeeper/trunk/src/c/src/zk_adaptor.h Sat Nov 17 14:03:03 2012
@@ -28,6 +28,7 @@
 #endif
 #include "zookeeper.h"
 #include "zk_hashtable.h"
+#include "addrvec.h"
 
 /* predefined xid's values recognized as special by the server */
 #define WATCHER_EVENT_XID -1 
@@ -156,10 +157,11 @@ struct prime_struct {
 struct adaptor_threads {
      pthread_t io;
      pthread_t completion;
-     int threadsToWait;         // barrier
-     pthread_cond_t cond;       // barrier's conditional
-     pthread_mutex_t lock;      // ... and a lock
-     pthread_mutex_t zh_lock;   // critical section lock
+     int threadsToWait;             // barrier
+     pthread_cond_t cond;           // barrier's conditional
+     pthread_mutex_t lock;          // ... and a lock
+     pthread_mutex_t zh_lock;       // critical section lock
+     pthread_mutex_t reconfig_lock; // lock for reconfiguring cluster's ensemble
 #ifdef WIN32
      SOCKET self_pipe[2];
 #else
@@ -179,52 +181,71 @@ typedef struct _auth_list_head {
 /**
  * This structure represents the connection to zookeeper.
  */
-
 struct _zhandle {
 #ifdef WIN32
-    SOCKET fd; /* the descriptor used to talk to zookeeper */
+    SOCKET fd;                          // the descriptor used to talk to zookeeper
 #else
-    int fd; /* the descriptor used to talk to zookeeper */
+    int fd;                             // the descriptor used to talk to zookeeper
 #endif
-    char *hostname; /* the hostname of zookeeper */
-    struct sockaddr_storage *addrs; /* the addresses that correspond to the hostname */
-    int addrs_count; /* The number of addresses in the addrs array */
-    watcher_fn watcher; /* the registered watcher */
-    struct timeval last_recv; /* The time that the last message was received */
-    struct timeval last_send; /* The time that the last message was sent */
-    struct timeval last_ping; /* The time that the last PING was sent */
-    struct timeval next_deadline; /* The time of the next deadline */
-    int recv_timeout; /* The maximum amount of time that can go by without 
-     receiving anything from the zookeeper server */
-    buffer_list_t *input_buffer; /* the current buffer being read in */
-    buffer_head_t to_process; /* The buffers that have been read and are ready to be processed. */
-    buffer_head_t to_send; /* The packets queued to send */
-    completion_head_t sent_requests; /* The outstanding requests */
-    completion_head_t completions_to_process; /* completions that are ready to run */
-    int connect_index; /* The index of the address to connect to */
-    clientid_t client_id;
-    long long last_zxid;
-    int outstanding_sync; /* Number of outstanding synchronous requests */
-    struct _buffer_list primer_buffer; /* The buffer used for the handshake at the start of a connection */
-    struct prime_struct primer_storage; /* the connect response */
-    char primer_storage_buffer[40]; /* the true size of primer_storage */
-    volatile int state;
-    void *context;
-    auth_list_head_t auth_h; /* authentication data list */
+
+    // Hostlist and list of addresses
+    char *hostname;                     // hostname contains list of zookeeper servers to connect to
+    struct sockaddr_storage addr_cur;   // address of server we're currently connecting/connected to 
+
+    addrvec_t addrs;                    // current list of addresses we're connected to
+    addrvec_t addrs_old;                // old list of addresses that we are no longer connected to
+    addrvec_t addrs_new;                // new list of addresses to connect to if we're reconfiguring
+
+    int reconfig;                       // Are we in the process of reconfiguring cluster's ensemble
+    double pOld, pNew;                  // Probability for selecting between 'addrs_old' and 'addrs_new'
+    int delay;
+
+    watcher_fn watcher;                 // the registered watcher
+
+    // Message timings
+    struct timeval last_recv;           // time last message was received
+    struct timeval last_send;           // time last message was sent
+    struct timeval last_ping;           // time last PING was sent
+    struct timeval next_deadline;       // time of the next deadline
+    int recv_timeout;                   // max receive timeout for messages from server
+
+    // Buffers
+    buffer_list_t *input_buffer;        // current buffer being read in
+    buffer_head_t to_process;           // buffers that have been read and ready to be processed
+    buffer_head_t to_send;              // packets queued to send
+    completion_head_t sent_requests;    // outstanding requests
+    completion_head_t completions_to_process; // completions that are ready to run
+    int outstanding_sync;               // number of outstanding synchronous requests
+
+    // State info
+    volatile int state;                 // Current zookeeper state
+    void *context;                      // client-side provided context
+    clientid_t client_id;               // client-id
+    long long last_zxid;                // last zookeeper ID
+    auth_list_head_t auth_h;            // authentication data list
+
+    // Primer storage
+    struct _buffer_list primer_buffer;  // The buffer used for the handshake at the start of a connection
+    struct prime_struct primer_storage; // the connect response
+    char primer_storage_buffer[40];     // the true size of primer_storage
+
     /* zookeeper_close is not reentrant because it de-allocates the zhandler. 
      * This guard variable is used to defer the destruction of zhandle till 
      * right before top-level API call returns to the caller */
     int32_t ref_counter;
     volatile int close_requested;
     void *adaptor_priv;
+
     /* Used for debugging only: non-zero value indicates the time when the zookeeper_process
      * call returned while there was at least one unprocessed server response 
      * available in the socket recv buffer */
     struct timeval socket_readable;
-    
+
+    // Watchers
     zk_hashtable* active_node_watchers;   
     zk_hashtable* active_exist_watchers;
     zk_hashtable* active_child_watchers;
+
     /** used for chroot path at the client side **/
     char *chroot;
 };
@@ -246,13 +267,19 @@ void free_duplicate_path(const char* fre
 void zoo_lock_auth(zhandle_t *zh);
 void zoo_unlock_auth(zhandle_t *zh);
 
+// ensemble reconfigure access guards
+void lock_reconfig(struct _zhandle *zh);
+void unlock_reconfig(struct _zhandle *zh);
+
 // critical section guards
 void enter_critical(zhandle_t* zh);
 void leave_critical(zhandle_t* zh);
+
 // zhandle object reference counting
 void api_prolog(zhandle_t* zh);
 int api_epilog(zhandle_t *zh, int rc);
 int32_t get_xid();
+
 // returns the new value of the ref counter
 int32_t inc_ref_counter(zhandle_t* zh,int i);
 

Modified: zookeeper/trunk/src/c/src/zookeeper.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/zookeeper.c?rev=1410731&r1=1410730&r2=1410731&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/zookeeper.c (original)
+++ zookeeper/trunk/src/c/src/zookeeper.c Sat Nov 17 14:03:03 2012
@@ -74,6 +74,8 @@ const int ZOO_AUTH_FAILED_STATE = AUTH_F
 const int ZOO_CONNECTING_STATE = CONNECTING_STATE_DEF;
 const int ZOO_ASSOCIATING_STATE = ASSOCIATING_STATE_DEF;
 const int ZOO_CONNECTED_STATE = CONNECTED_STATE_DEF;
+const int ZOO_NOTCONNECTED_STATE = NOTCONNECTED_STATE_DEF;
+
 static __attribute__ ((unused)) const char* state2String(int state){
     switch(state){
     case 0:
@@ -176,7 +178,6 @@ typedef struct _completion_list {
 const char*err2string(int err);
 static int queue_session_event(zhandle_t *zh, int state);
 static const char* format_endpoint_info(const struct sockaddr_storage* ep);
-static const char* format_current_endpoint_info(zhandle_t* zh);
 
 /* deserialize forward declarations */
 static void deserialize_response(int type, int xid, int failed, int rc, completion_list_t *cptr, struct iarchive *ia);
@@ -398,14 +399,12 @@ static void destroy(zhandle_t *zh)
     if (zh->fd != -1) {
         close(zh->fd);
         zh->fd = -1;
+        memset(&zh->addr_cur, 0, sizeof(zh->addr_cur));
         zh->state = 0;
     }
-    if (zh->addrs != 0) {
-        free(zh->addrs);
-        zh->addrs = NULL;
-    }
+    addrvec_free(&zh->addrs);
 
-    if (zh->chroot != 0) {
+    if (zh->chroot != NULL) {
         free(zh->chroot);
         zh->chroot = NULL;
     }
@@ -429,6 +428,7 @@ static void setup_random()
         close(fd);
     }
     srandom(seed);
+    srand48(seed);
 #endif
 }
 
@@ -456,31 +456,67 @@ static int getaddrinfo_errno(int rc) { 
 #endif
 
 /**
- * fill in the addrs array of the zookeeper servers in the zhandle. after filling
- * them in, we will permute them for load balancing.
+ * Count the number of hosts in the connection host string. This assumes it's
+ * a well-formed connection string whereby each host is separated by a comma.
  */
-int getaddrs(zhandle_t *zh)
+static int count_hosts(char *hosts)
 {
-    char *hosts = strdup(zh->hostname);
-    char *host;
-    char *strtok_last;
-    struct sockaddr_storage *addr;
-    int i;
-    int rc;
-    int alen = 0; /* the allocated length of the addrs array */
+    if (!hosts || strlen(hosts) == 0) {
+        return 0;
+    }
+
+    uint32_t count = 0;
+    char *loc = hosts;
+
+    while ((loc = strchr(loc, ','))) {
+        count++;
+        loc+=1;
+    }
+
+    return count+1;
+}
+
+/**
+ * Resolve hosts and populate provided address vector with shuffled results.
+ * The contents of the provided address vector will be initialized to an
+ * empty state.
+ */
+int resolve_hosts(const char *hosts_in, addrvec_t *avec)
+{
+    int rc = ZOK;
 
-    zh->addrs_count = 0;
-    if (zh->addrs) {
-        free(zh->addrs);
-        zh->addrs = 0;
+    if (hosts_in == NULL || avec == NULL) {
+        return ZBADARGUMENTS;
     }
-    if (!hosts) {
-         LOG_ERROR(("out of memory"));
+
+    // initialize address vector
+    addrvec_init(avec);
+
+    char *hosts = strdup(hosts_in);
+    if (hosts == NULL) {
+        LOG_ERROR(("out of memory"));
         errno=ENOMEM;
-        return ZSYSTEMERROR;
+        rc=ZSYSTEMERROR;
+        goto fail;
     }
-    zh->addrs = 0;
-    host=strtok_r(hosts, ",", &strtok_last);
+
+    int num_hosts = count_hosts(hosts);
+    if (num_hosts == 0) {
+        free(hosts);
+        return ZOK;
+    }
+
+    // Allocate list inside avec
+    rc = addrvec_alloc_capacity(avec, num_hosts);
+    if (rc != 0) {
+        LOG_ERROR(("out of memory"));
+        errno=ENOMEM;
+        rc=ZSYSTEMERROR;
+        goto fail;
+    }
+
+    char *strtok_last;
+    char * host = strtok_r(hosts, ",", &strtok_last);
     while(host) {
         char *port_spec = strrchr(host, ':');
         char *end_port_spec;
@@ -516,26 +552,25 @@ int getaddrs(zhandle_t *zh)
             goto fail;
         }
 
-        /* Setup the address array */
+        // Setup the address array
         for(ptr = he->h_addr_list;*ptr != 0; ptr++) {
-            if (zh->addrs_count == alen) {
-                alen += 16;
-                zh->addrs = realloc(zh->addrs, sizeof(*zh->addrs)*alen);
-                if (zh->addrs == 0) {
+            if (addrs->count == addrs->capacity) {
+                rc = addrvec_grow_default(addrs);
+                if (rc != 0) {
                     LOG_ERROR(("out of memory"));
                     errno=ENOMEM;
                     rc=ZSYSTEMERROR;
                     goto fail;
                 }
             }
-            addr = &zh->addrs[zh->addrs_count];
+            addr = &addrs->list[addrs->count];
             addr4 = (struct sockaddr_in*)addr;
             addr->ss_family = he->h_addrtype;
             if (addr->ss_family == AF_INET) {
                 addr4->sin_port = htons(port);
                 memset(&addr4->sin_zero, 0, sizeof(addr4->sin_zero));
                 memcpy(&addr4->sin_addr, *ptr, he->h_length);
-                zh->addrs_count++;
+                zh->addrs.count++;
             }
 #if defined(AF_INET6)
             else if (addr->ss_family == AF_INET6) {
@@ -546,12 +581,12 @@ int getaddrs(zhandle_t *zh)
                 addr6->sin6_scope_id = 0;
                 addr6->sin6_flowinfo = 0;
                 memcpy(&addr6->sin6_addr, *ptr, he->h_length);
-                zh->addrs_count++;
+                zh->addrs.count++;
             }
 #endif
             else {
                 LOG_WARN(("skipping unknown address family %x for %s",
-                         addr->ss_family, zh->hostname));
+                         addr->ss_family, hosts_in));
             }
         }
         host = strtok_r(0, ",", &strtok_last);
@@ -607,32 +642,27 @@ int getaddrs(zhandle_t *zh)
 
         for (res = res0; res; res = res->ai_next) {
             // Expand address list if needed
-            if (zh->addrs_count == alen) {
-                void *tmpaddr;
-                alen += 16;
-                tmpaddr = realloc(zh->addrs, sizeof(*zh->addrs)*alen);
-                if (tmpaddr == 0) {
+            if (avec->count == avec->capacity) {
+                rc = addrvec_grow_default(avec);
+                if (rc != 0) {
                     LOG_ERROR(("out of memory"));
                     errno=ENOMEM;
                     rc=ZSYSTEMERROR;
                     goto fail;
                 }
-                zh->addrs=tmpaddr;
             }
 
             // Copy addrinfo into address list
-            addr = &zh->addrs[zh->addrs_count];
             switch (res->ai_family) {
             case AF_INET:
 #if defined(AF_INET6)
             case AF_INET6:
 #endif
-                memcpy(addr, res->ai_addr, res->ai_addrlen);
-                ++zh->addrs_count;
+                addrvec_append_addrinfo(avec, res);
                 break;
             default:
                 LOG_WARN(("skipping unknown address family %x for %s",
-                res->ai_family, zh->hostname));
+                          res->ai_family, hosts_in));
                 break;
             }
         }
@@ -647,25 +677,179 @@ int getaddrs(zhandle_t *zh)
 
     if(!disable_conn_permute){
         setup_random();
-        /* Permute */
-        for (i = zh->addrs_count - 1; i > 0; --i) {
-            long int j = random()%(i+1);
-            if (i != j) {
-                struct sockaddr_storage t = zh->addrs[i];
-                zh->addrs[i] = zh->addrs[j];
-                zh->addrs[j] = t;
+        addrvec_shuffle(avec);
+    }
+
+    return ZOK;
+
+fail:
+    addrvec_free(avec);
+
+    if (hosts) {
+        free(hosts);
+        hosts = NULL;
+    }
+
+    return rc;
+}
+
+/**
+ * Updates the list of servers and determine if changing connections is necessary.
+ * Permutes server list for proper load balancing.
+ *
+ * Changing connections is necessary if one of the following holds:
+ * a) the server this client is currently connected is not in new address list.
+ *    Otherwise (if currentHost is in the new list):
+ * b) the number of servers in the cluster is increasing - in this case the load
+ *    on currentHost should decrease, which means that SOME of the clients 
+ *    connected to it will migrate to the new servers. The decision whether this
+ *    client migrates or not is probabilistic so that the expected number of 
+ *    clients connected to each server is the same.
+ *    
+ * If reconfig is set to true, the function sets pOld and pNew that correspond 
+ * to the probability to migrate to ones of the new servers or one of the old
+ * servers (migrating to one of the old servers is done only if our client's
+ * currentHost is not in new list). 
+ * 
+ * See zoo_cycle_next_server for the selection logic.
+ * 
+ * See {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1355} for the 
+ * protocol and its evaluation,
+ */
+int update_addrs(zhandle_t *zh)
+{
+    // Verify we have a valid handle
+    if (zh == NULL) {
+        return ZBADARGUMENTS;
+    }
+
+    // zh->hostname should always be set
+    if (zh->hostname == NULL)
+    {
+        return ZSYSTEMERROR;
+    } 
+
+    // NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new}
+    lock_reconfig(zh);
+
+    int rc = ZOK;
+    char *hosts = NULL;
+
+    // Copy zh->hostname for local use
+    hosts = strdup(zh->hostname);
+    if (hosts == NULL) {
+        rc = ZSYSTEMERROR;
+        goto fail;
+    }
+
+    addrvec_t resolved = { 0 };
+    rc = resolve_hosts(hosts, &resolved);
+    if (rc != ZOK)
+    {
+        goto fail;
+    }
+
+    // If the addrvec list is identical to last time we ran don't do anything
+    if (addrvec_eq(&zh->addrs, &resolved))
+    {
+        goto fail;
+    }
+
+    // Is the server we're connected to in the new resolved list?
+    int found_current = addrvec_contains(&resolved, &zh->addr_cur);
+
+    // Clear out old and new address lists
+    zh->reconfig = 1;
+    addrvec_free(&zh->addrs_old);
+    addrvec_free(&zh->addrs_new);
+
+    // Divide server list into addrs_old if in previous list and addrs_new if not
+    int i = 0;
+    for (i = 0; i < resolved.count; i++)
+    {
+        struct sockaddr_storage *resolved_address = &resolved.data[i];
+        if (addrvec_contains(&zh->addrs, resolved_address))
+        {
+            rc = addrvec_append(&zh->addrs_old, resolved_address);
+            if (rc != ZOK)
+            {
+                goto fail;
+            }
+        } 
+        else {
+            rc = addrvec_append(&zh->addrs_new, resolved_address);
+            if (rc != ZOK)
+            {
+                goto fail;
             }
         }
     }
-    return ZOK;
+
+    int num_old = zh->addrs_old.count;
+    int num_new = zh->addrs_new.count;
+
+    // Number of servers increased
+    if (num_old + num_new > zh->addrs.count)
+    {
+        if (found_current) {
+            // my server is in the new config, but load should be decreased.
+            // Need to decide if the client is moving to one of the new servers
+            if (drand48() <= (1 - ((double)zh->addrs.count) / (num_old + num_new))) {
+                zh->pNew = 1;
+                zh->pOld = 0;
+            } else {
+                // do nothing special -- stay with the current server
+                zh->reconfig = 0;
+            }
+        } else {
+            // my server is not in the new config, and load on old servers must 
+            // be decreased, so connect to one of the new servers
+            zh->pNew = 1;
+            zh->pOld = 0;
+        }
+    } 
+
+    // Number of servers stayed the same or decreased
+    else {
+        if (found_current) {
+            // my server is in the new config, and load should be increased, so
+            // stay with this server and do nothing special
+            zh->reconfig = 0;
+        } else {
+            zh->pOld = ((double) (num_old * (zh->addrs.count - (num_old + num_new)))) / ((num_old + num_new) * (zh->addrs.count - num_old));
+            zh->pNew = 1 - zh->pOld;
+        }
+    }
+
+    addrvec_free(&zh->addrs);
+    zh->addrs = resolved;
+
+    // If we need to do a reconfig and we're currently connected to a server, 
+    // then force close that connection so on next interest() call we'll make a 
+    // new connection
+    if (zh->reconfig == 1 && zh->fd != -1)
+    {
+        close(zh->fd);
+        zh->fd = -1;
+        zh->state = ZOO_NOTCONNECTED_STATE;
+    }
+
 fail:
-    if (zh->addrs) {
-        free(zh->addrs);
-        zh->addrs=0;
+
+    unlock_reconfig(zh);
+
+    // If we short-circuited out and never assigned resolved to zh->addrs then we 
+    // need to free resolved to avoid a memleak.
+    if (zh->addrs.data != resolved.data)
+    {
+        addrvec_free(&resolved);
     }
+
     if (hosts) {
         free(hosts);
+        hosts = NULL;
     }
+
     return rc;
 }
 
@@ -791,8 +975,9 @@ zhandle_t *zookeeper_init(const char *ho
     if (!zh) {
         return 0;
     }
+    zh->hostname = NULL;
     zh->fd = -1;
-    zh->state = NOTCONNECTED_STATE_DEF;
+    zh->state = ZOO_NOTCONNECTED_STATE;
     zh->context = context;
     zh->recv_timeout = recv_timeout;
     init_auth_info(&zh->auth_h);
@@ -805,8 +990,7 @@ zhandle_t *zookeeper_init(const char *ho
         errno=EINVAL;
         goto abort;
     }
-    //parse the host to get the chroot if
-    //available
+    //parse the host to get the chroot if available
     index_chroot = strchr(host, '/');
     if (index_chroot) {
         zh->chroot = strdup(index_chroot);
@@ -835,10 +1019,9 @@ zhandle_t *zookeeper_init(const char *ho
     if (zh->hostname == 0) {
         goto abort;
     }
-    if(getaddrs(zh)!=0) {
+    if(update_addrs(zh) != 0) {
         goto abort;
     }
-    zh->connect_index = 0;
     if (clientid) {
         memcpy(&zh->client_id, clientid, sizeof(zh->client_id));
     } else {
@@ -869,6 +1052,134 @@ abort:
 }
 
 /**
+ * Set a new list of zk servers to connect to.  Disconnect will occur if
+ * current connection endpoint is not in the list.
+ */
+int zoo_set_servers(zhandle_t *zh, const char *hosts)
+{
+    if (hosts == NULL)
+    {
+        LOG_ERROR(("New server list cannot be empty"));
+        return ZBADARGUMENTS;
+    }
+
+    // NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new}
+    lock_reconfig(zh);
+
+    // Reset hostname to new set of hosts to connect to
+    if (zh->hostname) {
+        free(zh->hostname);
+    }
+
+    zh->hostname = strdup(hosts);
+
+    unlock_reconfig(zh);
+
+    return update_addrs(zh);
+}
+
+/**
+ * Get the next server to connect to, when in 'reconfig' mode, which means that
+ * we've updated the server list to connect to, and are now trying to find some
+ * server to connect to. Once we get successfully connected, 'reconfig' mode is
+ * set to false. Similarly, if we tried to connect to all servers in new config
+ * and failed, 'reconfig' mode is set to false. 
+ *
+ * While in 'reconfig' mode, we should connect to a server in the new set of
+ * servers (addrs_new) with probability pNew and to servers in the old set of 
+ * servers (addrs_old) with probability pOld (which is just 1-pNew). If we tried
+ * out all servers in either, we continue to try servers from the other set, 
+ * regardless of pNew or pOld. If we tried all servers we give up and go back to
+ * the normal round robin mode
+ * 
+ * When called, must be protected by lock_reconfig(zh).
+ */
+static int get_next_server_in_reconfig(zhandle_t *zh)
+{
+    int take_new = drand48() <= zh->pNew;
+
+    LOG_DEBUG(("[OLD] count=%d capacity=%d next=%d hasnext=%d",
+               zh->addrs_old.count, zh->addrs_old.capacity, zh->addrs_old.next, 
+               addrvec_hasnext(&zh->addrs_old)));
+    LOG_DEBUG(("[NEW] count=%d capacity=%d next=%d hasnext=%d",
+               zh->addrs_new.count, zh->addrs_new.capacity, zh->addrs_new.next, 
+               addrvec_hasnext(&zh->addrs_new)));
+
+    // Take one of the new servers if we haven't tried them all yet
+    // and either the probability tells us to connect to one of the new servers
+    // or if we already tried them all then use one of the old servers
+    if (addrvec_hasnext(&zh->addrs_new) 
+            && (take_new || !addrvec_hasnext(&zh->addrs_old)))
+    {
+        addrvec_next(&zh->addrs_new, &zh->addr_cur);
+        LOG_DEBUG(("Using next from NEW=%s", format_endpoint_info(&zh->addr_cur)));
+        return 0;
+    }
+
+    // start taking old servers
+    if (addrvec_hasnext(&zh->addrs_old)) {
+        addrvec_next(&zh->addrs_old, &zh->addr_cur);
+        LOG_DEBUG(("Using next from OLD=%s", format_endpoint_info(&zh->addr_cur)));
+        return 0;
+    }
+
+    LOG_DEBUG(("Failed to find either new or old"));
+    memset(&zh->addr_cur, 0, sizeof(zh->addr_cur));
+    return 1;
+}
+
+/** 
+ * Cycle through our server list to the correct 'next' server. The 'next' server
+ * to connect to depends upon whether we're in a 'reconfig' mode or not. Reconfig
+ * mode means we've upated the server list and are now trying to find a server
+ * to connect to. Once we get connected, we are no longer in the reconfig mode.
+ * Similarly, if we try to connect to all the servers in the new configuration
+ * and failed, reconfig mode is set to false.
+ * 
+ * For more algorithm details, see get_next_server_in_reconfig.
+ */
+void zoo_cycle_next_server(zhandle_t *zh)
+{
+    // NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new}
+    lock_reconfig(zh);
+
+    memset(&zh->addr_cur, 0, sizeof(zh->addr_cur));
+
+    if (zh->reconfig)
+    {
+        if (get_next_server_in_reconfig(zh) == 0) {
+            unlock_reconfig(zh);
+            return;
+        }
+
+        // tried all new and old servers and couldn't connect
+        zh->reconfig = 0;
+    }
+
+    addrvec_next(&zh->addrs, &zh->addr_cur);
+
+    unlock_reconfig(zh);
+
+    return;
+}
+
+/** 
+ * Get the host:port for the server we are currently connecting to or connected
+ * to. This is largely for testing purposes but is also generally useful for
+ * other client software built on top of this client.
+ */
+const char* zoo_get_current_server(zhandle_t* zh)
+{
+    // NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new}
+    // Need the lock here as it is changed in update_addrs()
+    lock_reconfig(zh);
+
+    const char * endpoint_info = format_endpoint_info(&zh->addr_cur);
+    unlock_reconfig(zh);
+    return endpoint_info;
+}
+
+/**
  * deallocated the free_path only its beeen allocated
  * and not equal to path
  */
@@ -1233,7 +1544,15 @@ static void handle_error(zhandle_t *zh,i
     }
     cleanup_bufs(zh,1,rc);
     zh->fd = -1;
-    zh->connect_index++;
+
+    LOG_DEBUG(("Previous connection=[%s] delay=%d", zoo_get_current_server(zh), zh->delay));
+
+    // NOTE: If we're at the end of the list of addresses to connect to, then
+    // we want to delay the next connection attempt to avoid spinning.
+    // Then increment what host we'll connect to since we failed to connect to current
+    zh->delay = addrvec_atend(&zh->addrs);
+    addrvec_next(&zh->addrs, &zh->addr_cur);
+
     if (!is_unrecoverable(zh)) {
         zh->state = 0;
     }
@@ -1252,7 +1571,7 @@ static int handle_socket_error_msg(zhand
         vsnprintf(buf, sizeof(buf)-1,format,va);
         log_message(ZOO_LOG_LEVEL_ERROR,line,__func__,
             format_log_message("Socket [%s] zk retcode=%d, errno=%d(%s): %s",
-            format_current_endpoint_info(zh),rc,errno,strerror(errno),buf));
+            zoo_get_current_server(zh),rc,errno,strerror(errno),buf));
         va_end(va);
     }
     handle_error(zh,rc);
@@ -1335,7 +1654,7 @@ static int send_auth_info(zhandle_t *zh)
         auth = auth->next;
     }
     zoo_unlock_auth(zh);
-    LOG_DEBUG(("Sending all auth info request to %s", format_current_endpoint_info(zh)));
+    LOG_DEBUG(("Sending all auth info request to %s", zoo_get_current_server(zh)));
     return (rc <0) ? ZMARSHALLINGERROR:ZOK;
 }
 
@@ -1352,7 +1671,7 @@ static int send_last_auth_info(zhandle_t
     }
     rc = send_info_packet(zh, auth);
     zoo_unlock_auth(zh);
-    LOG_DEBUG(("Sending auth info request to %s",format_current_endpoint_info(zh)));
+    LOG_DEBUG(("Sending auth info request to %s",zoo_get_current_server(zh)));
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
 }
 
@@ -1399,7 +1718,7 @@ static int send_set_watches(zhandle_t *z
     free_key_list(req.dataWatches.data, req.dataWatches.count);
     free_key_list(req.existWatches.data, req.existWatches.count);
     free_key_list(req.childWatches.data, req.childWatches.count);
-    LOG_DEBUG(("Sending set watches request to %s",format_current_endpoint_info(zh)));
+    LOG_DEBUG(("Sending set watches request to %s",zoo_get_current_server(zh)));
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
 }
 
@@ -1540,7 +1859,6 @@ static struct timeval get_timeval(int in
 int zookeeper_interest(zhandle_t *zh, SOCKET *fd, int *interest,
      struct timeval *tv)
 {
-
     ULONG nonblocking_flag = 1;
 #else
 int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
@@ -1561,16 +1879,39 @@ int zookeeper_interest(zhandle_t *zh, in
             LOG_WARN(("Exceeded deadline by %dms", time_left));
     }
     api_prolog(zh);
+
+    int rc = update_addrs(zh);
+    if (rc != ZOK) {
+        return api_epilog(zh, rc);
+    }
+
     *fd = zh->fd;
     *interest = 0;
     tv->tv_sec = 0;
     tv->tv_usec = 0;
+
     if (*fd == -1) {
-        if (zh->connect_index == zh->addrs_count) {
-            /* Wait a bit before trying again so that we don't spin */
-            zh->connect_index = 0;
-        }else {
-            int rc;
+
+        /* 
+         * If we previously failed to connect to server pool (zh->delay == 1)
+         * then we need delay our connection on this iteration 1/60 of the
+         * recv timeout before trying again so we don't spin.
+         *
+         * We always clear the delay setting. If we fail again, we'll set delay
+         * again and on the next iteration we'll do the same.
+         */
+        if (zh->delay == 1) {
+            *tv = get_timeval(zh->recv_timeout/60);
+            zh->delay = 0;
+
+            LOG_WARN(("Delaying connection after exhaustively trying all servers [%s]",
+                     zh->hostname));
+        } 
+
+        // No need to delay -- grab the next server and attempt connection
+        else {
+            zoo_cycle_next_server(zh);
+
 #ifdef WIN32
             char enable_tcp_nodelay = 1;
 #else
@@ -1578,7 +1919,7 @@ int zookeeper_interest(zhandle_t *zh, in
 #endif
             int ssoresult;
 
-            zh->fd = socket(zh->addrs[zh->connect_index].ss_family, SOCK_STREAM, 0);
+            zh->fd = socket(zh->addr_cur.ss_family, SOCK_STREAM, 0);
             if (zh->fd < 0) {
                 return api_epilog(zh,handle_socket_error_msg(zh,__LINE__,
                                                              ZSYSTEMERROR, "socket() call failed"));
@@ -1593,41 +1934,44 @@ int zookeeper_interest(zhandle_t *zh, in
             fcntl(zh->fd, F_SETFL, O_NONBLOCK|fcntl(zh->fd, F_GETFL, 0));
 #endif
 #if defined(AF_INET6)
-            if (zh->addrs[zh->connect_index].ss_family == AF_INET6) {
-                rc = connect(zh->fd, (struct sockaddr*) &zh->addrs[zh->connect_index], sizeof(struct sockaddr_in6));
+            if (zh->addr_cur.ss_family == AF_INET6) {
+                rc = connect(zh->fd, (struct sockaddr*)&zh->addr_cur, sizeof(struct sockaddr_in6));
             } else {
 #else
                LOG_DEBUG(("[zk] connect()\n"));
             {
 #endif
-                rc = connect(zh->fd, (struct sockaddr*) &zh->addrs[zh->connect_index], sizeof(struct sockaddr_in));
+                rc = connect(zh->fd, (struct sockaddr*)&zh->addr_cur, sizeof(struct sockaddr_in));
 #ifdef WIN32
                 get_errno();
 #endif
             }
             if (rc == -1) {
+
                 /* we are handling the non-blocking connect according to
                  * the description in section 16.3 "Non-blocking connect"
                  * in UNIX Network Programming vol 1, 3rd edition */
                 if (errno == EWOULDBLOCK || errno == EINPROGRESS)
                     zh->state = ZOO_CONNECTING_STATE;
                 else
+                {
                     return api_epilog(zh,handle_socket_error_msg(zh,__LINE__,
                             ZCONNECTIONLOSS,"connect() call failed"));
+                }
             } else {
                 if((rc=prime_connection(zh))!=0)
                     return api_epilog(zh,rc);
 
-                LOG_INFO(("Initiated connection to server [%s]",
-                        format_endpoint_info(&zh->addrs[zh->connect_index])));
+                LOG_INFO(("Initiated connection to server [%s]", format_endpoint_info(&zh->addr_cur)));
             }
+            *tv = get_timeval(zh->recv_timeout/3);
         }
         *fd = zh->fd;
-        *tv = get_timeval(zh->recv_timeout/3);
         zh->last_recv = now;
         zh->last_send = now;
         zh->last_ping = now;
     }
+
     if (zh->fd != -1) {
         int idle_recv = calculate_interval(&zh->last_recv, &now);
         int idle_send = calculate_interval(&zh->last_send, &now);
@@ -1646,7 +1990,7 @@ int zookeeper_interest(zhandle_t *zh, in
             return api_epilog(zh,handle_socket_error_msg(zh,
                     __LINE__,ZOPERATIONTIMEOUT,
                     "connection to %s timed out (exceeded timeout by %dms)",
-                    format_endpoint_info(&zh->addrs[zh->connect_index]),
+                    format_endpoint_info(&zh->addr_cur),
                     -recv_to));
 
         }
@@ -1656,8 +2000,8 @@ int zookeeper_interest(zhandle_t *zh, in
             send_to = zh->recv_timeout/3 - idle_send;
             if (send_to <= 0 && zh->sent_requests.head==0) {
 //                LOG_DEBUG(("Sending PING to %s (exceeded idle by %dms)",
-//                                format_current_endpoint_info(zh),-send_to));
-                int rc=send_ping(zh);
+//                                zoo_get_current_server(zh),-send_to));
+                rc = send_ping(zh);
                 if (rc < 0){
                     LOG_ERROR(("failed to send PING request (zk retcode=%d)",rc));
                     return api_epilog(zh,rc);
@@ -1677,7 +2021,7 @@ int zookeeper_interest(zhandle_t *zh, in
         /* we are interested in a write if we are connected and have something
          * to send, or we are waiting for a connect to finish. */
         if ((zh->to_send.head && (zh->state == ZOO_CONNECTED_STATE))
-        || zh->state == ZOO_CONNECTING_STATE) {
+            || zh->state == ZOO_CONNECTING_STATE) {
             *interest |= ZOOKEEPER_WRITE;
         }
     }
@@ -1701,10 +2045,11 @@ static int check_events(zhandle_t *zh, i
             return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
                 "server refused to accept the client");
         }
+
         if((rc=prime_connection(zh))!=0)
             return rc;
-        LOG_INFO(("initiated connection to server [%s]",
-                format_endpoint_info(&zh->addrs[zh->connect_index])));
+
+        LOG_INFO(("initiated connection to server [%s]", format_endpoint_info(&zh->addr_cur)));
         return ZOK;
     }
     if (zh->to_send.head && (events&ZOOKEEPER_WRITE)) {
@@ -1745,12 +2090,13 @@ static int check_events(zhandle_t *zh, i
                 } else {
                     zh->recv_timeout = zh->primer_storage.timeOut;
                     zh->client_id.client_id = newid;
-                 
+
                     memcpy(zh->client_id.passwd, &zh->primer_storage.passwd,
                            sizeof(zh->client_id.passwd));
                     zh->state = ZOO_CONNECTED_STATE;
+                    zh->reconfig = 0;
                     LOG_INFO(("session establishment complete on server [%s], sessionId=%#llx, negotiated timeout=%d",
-                              format_endpoint_info(&zh->addrs[zh->connect_index]),
+                              format_endpoint_info(&zh->addr_cur),
                               newid, zh->recv_timeout));
                     /* we want the auth to be sent for, but since both call push to front
                        we need to call send_watch_set first */
@@ -1866,7 +2212,7 @@ static void process_sync_completion(
         completion_list_t *cptr,
         struct sync_completion *sc,
         struct iarchive *ia,
-	zhandle_t *zh)
+    zhandle_t *zh)
 {
     LOG_DEBUG(("Processing sync_completion with type=%d xid=%#x rc=%d",
             cptr->c.type, cptr->xid, sc->rc));
@@ -2274,9 +2620,9 @@ int zookeeper_process(zhandle_t *zh, int
                 struct sync_completion
                         *sc = (struct sync_completion*)cptr->data;
                 sc->rc = rc;
-                
+
                 process_sync_completion(cptr, sc, ia, zh); 
-                
+
                 notify_sync_completion(sc);
                 free_buffer(bptr);
                 zh->outstanding_sync--;
@@ -2359,7 +2705,7 @@ static completion_list_t* create_complet
     }
     c->xid = xid;
     c->watcher = wo;
-    
+
     return c;
 }
 
@@ -2489,7 +2835,7 @@ int zookeeper_close(zhandle_t *zh)
          * completions from calling zookeeper_close before we have
          * completed the adaptor_finish call below. */
 
-	/* Signal any syncronous completions before joining the threads */
+    /* Signal any syncronous completions before joining the threads */
         enter_critical(zh);
         free_completions(zh,1,ZCLOSING);
         leave_critical(zh);
@@ -2506,7 +2852,7 @@ int zookeeper_close(zhandle_t *zh)
         struct oarchive *oa;
         struct RequestHeader h = { STRUCT_INITIALIZER (xid , get_xid()), STRUCT_INITIALIZER (type , ZOO_CLOSE_OP)};
         LOG_INFO(("Closing zookeeper sessionId=%#llx to [%s]\n",
-                zh->client_id.client_id,format_current_endpoint_info(zh)));
+                zh->client_id.client_id,zoo_get_current_server(zh)));
         oa = create_buffer_oarchive();
         rc = serialize_RequestHeader(oa, "header", &h);
         rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
@@ -2589,7 +2935,7 @@ static int Request_path_init(zhandle_t *
         char **path_out, const char *path)
 {
     assert(path_out);
-    
+
     *path_out = prepend_string(zh, path);
     if (zh == NULL || !isValidPath(*path_out, flags)) {
         free_duplicate_path(*path_out, path);
@@ -2656,7 +3002,7 @@ int zoo_awget(zhandle_t *zh, const char 
     close_buffer_oarchive(&oa, 0);
 
     LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
-            format_current_endpoint_info(zh)));
+            zoo_get_current_server(zh)));
     /* make a best (non-blocking) effort to send the requests asap */
     adaptor_send_queue(zh, 0);
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -2701,7 +3047,7 @@ int zoo_aset(zhandle_t *zh, const char *
     close_buffer_oarchive(&oa, 0);
 
     LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
-            format_current_endpoint_info(zh)));
+            zoo_get_current_server(zh)));
     /* make a best (non-blocking) effort to send the requests asap */
     adaptor_send_queue(zh, 0);
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -2757,7 +3103,7 @@ int zoo_acreate(zhandle_t *zh, const cha
     close_buffer_oarchive(&oa, 0);
 
     LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
-            format_current_endpoint_info(zh)));
+            zoo_get_current_server(zh)));
     /* make a best (non-blocking) effort to send the requests asap */
     adaptor_send_queue(zh, 0);
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -2797,7 +3143,7 @@ int zoo_adelete(zhandle_t *zh, const cha
     close_buffer_oarchive(&oa, 0);
 
     LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
-            format_current_endpoint_info(zh)));
+            zoo_get_current_server(zh)));
     /* make a best (non-blocking) effort to send the requests asap */
     adaptor_send_queue(zh, 0);
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -2836,7 +3182,7 @@ int zoo_awexists(zhandle_t *zh, const ch
     close_buffer_oarchive(&oa, 0);
 
     LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
-            format_current_endpoint_info(zh)));
+            zoo_get_current_server(zh)));
     /* make a best (non-blocking) effort to send the requests asap */
     adaptor_send_queue(zh, 0);
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -2869,7 +3215,7 @@ static int zoo_awget_children_(zhandle_t
     close_buffer_oarchive(&oa, 0);
 
     LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
-            format_current_endpoint_info(zh)));
+            zoo_get_current_server(zh)));
     /* make a best (non-blocking) effort to send the requests asap */
     adaptor_send_queue(zh, 0);
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -2917,7 +3263,7 @@ static int zoo_awget_children2_(zhandle_
     close_buffer_oarchive(&oa, 0);
 
     LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
-            format_current_endpoint_info(zh)));
+            zoo_get_current_server(zh)));
     /* make a best (non-blocking) effort to send the requests asap */
     adaptor_send_queue(zh, 0);
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -2960,7 +3306,7 @@ int zoo_async(zhandle_t *zh, const char 
     close_buffer_oarchive(&oa, 0);
 
     LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
-            format_current_endpoint_info(zh)));
+            zoo_get_current_server(zh)));
     /* make a best (non-blocking) effort to send the requests asap */
     adaptor_send_queue(zh, 0);
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -2990,7 +3336,7 @@ int zoo_aget_acl(zhandle_t *zh, const ch
     close_buffer_oarchive(&oa, 0);
 
     LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
-            format_current_endpoint_info(zh)));
+            zoo_get_current_server(zh)));
     /* make a best (non-blocking) effort to send the requests asap */
     adaptor_send_queue(zh, 0);
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3021,7 +3367,7 @@ int zoo_aset_acl(zhandle_t *zh, const ch
     close_buffer_oarchive(&oa, 0);
 
     LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
-            format_current_endpoint_info(zh)));
+            zoo_get_current_server(zh)));
     /* make a best (non-blocking) effort to send the requests asap */
     adaptor_send_queue(zh, 0);
     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3033,16 +3379,16 @@ static void op_result_string_completion(
     struct zoo_op_result *result = (struct zoo_op_result *)data;
     assert(result);
     result->err = err;
-    
+
     if (result->value && value) {
         int len = strlen(value) + 1;
-		if (len > result->valuelen) {
-			len = result->valuelen;
-		}
-		if (len > 0) {
-			memcpy(result->value, value, len - 1);
-			result->value[len - 1] = '\0';
-		}
+        if (len > result->valuelen) {
+            len = result->valuelen;
+        }
+        if (len > 0) {
+            memcpy(result->value, value, len - 1);
+            result->value[len - 1] = '\0';
+        }
     } else {
         result->value = NULL;
     }
@@ -3100,7 +3446,7 @@ int zoo_amulti(zhandle_t *zh, int count,
 
         struct MultiHeader mh = { STRUCT_INITIALIZER(type, op->type), STRUCT_INITIALIZER(done, 0), STRUCT_INITIALIZER(err, -1) };
         rc = rc < 0 ? rc : serialize_MultiHeader(oa, "multiheader", &mh);
-     
+
         switch(op->type) {
             case ZOO_CREATE_OP: {
                 struct CreateRequest req;
@@ -3111,7 +3457,7 @@ int zoo_amulti(zhandle_t *zh, int count,
                                         op->create_op.flags);
                 rc = rc < 0 ? rc : serialize_CreateRequest(oa, "req", &req);
                 result->value = op->create_op.buf;
-				result->valuelen = op->create_op.buflen;
+                result->valuelen = op->create_op.buflen;
 
                 enter_critical(zh);
                 entry = create_completion_entry(h.xid, COMPLETION_STRING, op_result_string_completion, result, 0, 0); 
@@ -3169,19 +3515,19 @@ int zoo_amulti(zhandle_t *zh, int count,
     }
 
     rc = rc < 0 ? rc : serialize_MultiHeader(oa, "multiheader", &mh);
-  
+
     /* BEGIN: CRTICIAL SECTION */
     enter_critical(zh);
     rc = rc < 0 ? rc : add_multi_completion(zh, h.xid, completion, data, &clist);
     rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
             get_buffer_len(oa));
     leave_critical(zh);
-    
+
     /* We queued the buffer, so don't free it */
     close_buffer_oarchive(&oa, 0);
 
     LOG_DEBUG(("Sending multi request xid=%#x with %d subrequests to %s",
-            h.xid, index, format_current_endpoint_info(zh)));
+            h.xid, index, zoo_get_current_server(zh)));
     /* make a best (non-blocking) effort to send the requests asap */
     adaptor_send_queue(zh, 0);
 
@@ -3234,12 +3580,12 @@ void zoo_check_op_init(zoo_op_t *op, con
 int zoo_multi(zhandle_t *zh, int count, const zoo_op_t *ops, zoo_op_result_t *results)
 {
     int rc;
- 
+
     struct sync_completion *sc = alloc_sync_completion();
     if (!sc) {
         return ZSYSTEMERROR;
     }
-   
+
     rc = zoo_amulti(zh, count, ops, results, SYNCHRONOUS_MARKER, sc);
     if (rc == ZOK) {
         wait_sync_completion(sc);
@@ -3422,8 +3768,8 @@ int zoo_add_auth(zhandle_t *zh,const cha
 
 static const char* format_endpoint_info(const struct sockaddr_storage* ep)
 {
-    static char buf[128];
-    char addrstr[128];
+    static char buf[128] = { 0 };
+    char addrstr[128] = { 0 };
     void *inaddr;
 #ifdef WIN32
     char * addrstring;
@@ -3453,11 +3799,6 @@ static const char* format_endpoint_info(
     return buf;
 }
 
-static const char* format_current_endpoint_info(zhandle_t* zh)
-{
-    return format_endpoint_info(&zh->addrs[zh->connect_index]);
-}
-
 void zoo_deterministic_conn_order(int yesOrNo)
 {
     disable_conn_permute=yesOrNo;

Added: zookeeper/trunk/src/c/tests/TestReconfig.cc
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/TestReconfig.cc?rev=1410731&view=auto
==============================================================================
--- zookeeper/trunk/src/c/tests/TestReconfig.cc (added)
+++ zookeeper/trunk/src/c/tests/TestReconfig.cc Sat Nov 17 14:03:03 2012
@@ -0,0 +1,598 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cppunit/extensions/HelperMacros.h>
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <errno.h>
+#include <iostream>
+#include <sstream>
+#include <arpa/inet.h>
+#include <exception>
+#include <stdlib.h>
+
+#include "Util.h"
+#include "LibCMocks.h"
+#include "ZKMocks.h"
+
+using namespace std;
+
+static const int portOffset = 2000;
+
+class Client
+{
+
+private:    
+    // Member variables
+    zhandle_t *zh;
+    unsigned int seed;
+
+public:
+    /**
+     * Create a client with given connection host string and add to our internal
+     * vector of clients. These are disconnected and cleaned up in tearDown().
+     */
+    Client(const string hosts, unsigned int seed) :
+        seed((seed * seed) + 0xAFAFAFAF)
+    {
+        reSeed();
+
+        zh = zookeeper_init(hosts.c_str(),0,1000,0,0,0);
+        CPPUNIT_ASSERT(zh);
+
+        reSeed();
+
+        cycleNextServer();
+    }
+
+    void close()
+    {
+        zookeeper_close(zh);
+        zh = NULL;
+    }
+
+    bool isReconfig()
+    {
+        return zh->reconfig != 0;
+    }
+
+    /**
+     * re-seed this client with it's own previously generated seed so its
+     * random choices are unique and separate from the other clients
+     */
+    void reSeed()
+    {
+        srandom(seed);
+        srand48(seed);
+    }
+
+    /**
+     * Get the server that this client is currently connected to.
+     */
+    string getServer()
+    {
+        const char* addrstring = zoo_get_current_server(zh);
+        return string(addrstring);
+    }
+
+    /**
+     * Get the server this client is currently connected to with no port
+     * specification.
+     */
+    string getServerNoPort()
+    {
+        string addrstring = getServer();
+
+        size_t found = addrstring.find(":");
+        CPPUNIT_ASSERT(found != string::npos);
+
+        return addrstring.substr(0, found);
+    }
+
+    /**
+     * Get the port of the server this client is currently connected to.
+     */
+    uint32_t getServerPort()
+    {
+        string addrstring = getServer();
+
+        size_t found = addrstring.find(":");
+        CPPUNIT_ASSERT(found != string::npos);
+
+        string portStr = addrstring.substr(found+1);
+
+        stringstream ss(portStr);
+        uint32_t port;
+        ss >> port;
+
+        CPPUNIT_ASSERT(port >= portOffset);
+
+        return port;
+    }
+
+    /**
+     * Cycle to the next available server on the next connect attempt. It also
+     * calls into getServer (above) to return the server connected to.
+     */ 
+    string cycleNextServer()
+    {
+        zoo_cycle_next_server(zh);
+        return getServer();
+    }
+
+    void cycleUntilServer(const string requested)
+    {
+        // Call cycleNextServer until the one it's connected to is the one
+        // specified (disregarding port).
+        string first;
+
+        while(true)
+        {
+            string next = cycleNextServer();
+            if (first.empty())
+            {
+                first = next;
+            } 
+            // Else we've looped around!
+            else if (first == next)
+            {
+                CPPUNIT_ASSERT(false);
+            }
+
+            // Strip port off
+            string server = getServerNoPort();
+
+            // If it matches the requested host we're now 'connected' to the right host
+            if (server == requested)
+            {
+                break;
+            }
+        }
+    }
+
+    /**
+     * Set servers for this client.
+     */
+    void setServers(const string new_hosts)
+    {
+        int rc = zoo_set_servers(zh, new_hosts.c_str());
+        CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
+    }
+
+    /**
+     * Set servers for this client and validate reconfig value matches expected.
+     */
+    void setServersAndVerifyReconfig(const string new_hosts, bool is_reconfig)
+    {
+        setServers(new_hosts);
+        CPPUNIT_ASSERT_EQUAL(is_reconfig, isReconfig());
+    }
+
+    /**
+     * Sets the server list this client is connecting to AND if this requires
+     * the client to be reconfigured (as dictated by internal client policy)
+     * then it will trigger a call to cycleNextServer.
+     */
+    void setServersAndCycleIfNeeded(const string new_hosts)
+    {
+        setServers(new_hosts);
+        if (isReconfig())
+        {
+            cycleNextServer();
+        }
+    }
+};
+
+class Zookeeper_reconfig : public CPPUNIT_NS::TestFixture
+{
+    CPPUNIT_TEST_SUITE(Zookeeper_reconfig);
+
+    // Test cases
+    CPPUNIT_TEST(testcycleNextServer);
+    CPPUNIT_TEST(testMigrateOrNot);
+    CPPUNIT_TEST(testMigrationCycle);
+
+    // In threaded mode each 'create' is a thread -- it's not practical to create
+    // 10,000 threads to test load balancing. The load balancing code can easily
+    // be tested in single threaded mode as concurrency doesn't affect the algorithm.
+#ifndef THREADED
+    CPPUNIT_TEST(testMigrateProbability);
+    CPPUNIT_TEST(testLoadBalancing);
+#endif
+
+    CPPUNIT_TEST_SUITE_END();
+
+    FILE *logfile;
+
+    double slackPercent;
+    static const int numClients = 10000;
+    static const int portOffset = 2000;
+
+    vector<Client> clients;
+    vector<uint32_t> numClientsPerHost;
+
+public:
+    Zookeeper_reconfig() :
+        slackPercent(10.0)
+    {
+      logfile = openlogfile("Zookeeper_reconfig");
+    }
+
+    ~Zookeeper_reconfig() 
+    {
+      if (logfile) 
+      {
+        fflush(logfile);
+        fclose(logfile);
+        logfile = 0;
+      }
+    }
+
+    void setUp()
+    {
+        zoo_set_log_stream(logfile);
+        zoo_deterministic_conn_order(1);
+
+        numClientsPerHost.resize(numClients);
+    }
+
+    void tearDown()
+    {
+        for (int i = 0; i < clients.size(); i++)
+        {
+            clients.at(i).close();
+        }
+    }
+
+    /**
+     * Create a client with given connection host string and add to our internal
+     * vector of clients. These are disconnected and cleaned up in tearDown().
+     */
+    Client& createClient(const string hosts)
+    {
+        Client client(hosts, clients.size());
+        clients.push_back(client);
+
+        return clients.back();
+    }
+
+    /**
+     * Same as createClient(hosts) only it takes a specific host that this client
+     * should simulate being connected to.
+     */
+    Client& createClient(const string hosts, const string host)
+    {
+        // Ensure requested host is in the list
+        size_t found = hosts.find(host);
+        CPPUNIT_ASSERT(found != hosts.npos);
+
+        Client client(hosts, clients.size());
+        client.cycleUntilServer(host);
+        clients.push_back(client);
+
+        return clients.back();
+    }
+
+    /**
+     * Create a connection host list starting at 'start' and stopping at 'stop'
+     * where start >= stop. This creates a connection string with host:port pairs
+     * separated by commas. The given 'octet' is the starting octet that is used
+     * as the last octet in the host's IP. This is decremented on each iteration. 
+     * Each port will be portOffset + octet.
+     */
+    string createHostList(uint32_t start, uint32_t stop = 1, uint32_t octet = 0)
+    {
+        if (octet == 0)
+        {
+            octet = start;
+        }
+
+        stringstream ss;
+
+        for (int i = start; i >= stop; i--, octet--)
+        {
+            ss << "10.10.10." << octet << ":" << portOffset + octet;
+
+            if (i > stop)
+            {
+                ss << ", ";
+            }
+        }
+
+        return ss.str();
+    }
+
+    /**
+     * Gets the lower bound of the number of clients per server that we expect
+     * based on the probabilistic load balancing algorithm implemented by the
+     * client code.
+     */
+    double lowerboundClientsPerServer(int numClients, int numServers)
+    {
+        return (1 - slackPercent/100.0) * numClients / numServers;
+    }
+
+    /**
+     * Gets the upper bound of the number of clients per server that we expect
+     * based on the probabilistic load balancing algorithm implemented by the
+     * client code.
+     */
+    double upperboundClientsPerServer(int numClients, int numServers)
+    {
+        return (1 + slackPercent/100.0) * numClients / numServers;
+    }
+
+    /**
+     * Update all the clients to use a new list of servers. This will also cause
+     * the client to cycle to the next server as needed (e.g. due to a reconfig).
+     * It then updates the number of clients connected to the server based on
+     * this change.
+     * 
+     * Afterwards it validates that all of the servers have the correct amount of
+     * clients based on the probabilistic load balancing algorithm.
+     */
+    void updateAllClientsAndServers(int start, int stop = 1)
+    {
+        string newServers = createHostList(start, stop);
+        int numServers = start - stop + 1;
+
+        for (int i = 0; i < numClients; i++) {
+
+            Client &client = clients.at(i);
+            client.reSeed();
+
+            client.setServersAndCycleIfNeeded(newServers);
+            numClientsPerHost.at(client.getServerPort() - portOffset - 1)++;
+        }
+
+        int offset = stop - 1;
+        for (int index = offset; index < numServers; index++) {
+
+            if (numClientsPerHost.at(index) > upperboundClientsPerServer(numClients, numServers))
+            {
+                cout << "INDEX=" << index << " too many -- actual=" << numClientsPerHost.at(index) 
+                     << " expected=" << upperboundClientsPerServer(numClients, numServers) << endl;
+            }
+
+
+            CPPUNIT_ASSERT(numClientsPerHost.at(index) <= upperboundClientsPerServer(numClients, numServers));
+
+            if (numClientsPerHost.at(index) < lowerboundClientsPerServer(numClients, numServers))
+            {
+                cout << "INDEX=" << index << " too few -- actual=" << numClientsPerHost.at(index) 
+                     << " expected=" << lowerboundClientsPerServer(numClients, numServers) << endl;
+            }
+
+            CPPUNIT_ASSERT(numClientsPerHost.at(index) >= lowerboundClientsPerServer(numClients, numServers));
+            numClientsPerHost.at(index) = 0; // prepare for next test
+        }
+    }
+
+    /*-------------------------------------------------------------------------*
+     * TESTCASES
+     *------------------------------------------------------------------------*/
+
+    /**
+     * Very basic sunny day test to ensure basic functionality of zoo_set_servers
+     * and zoo_cycle_next_server.
+     */
+    void testcycleNextServer()
+    {
+        const string initial_hosts = createHostList(10); // 2010..2001
+        const string new_hosts = createHostList(4);      // 2004..2001
+
+        Client &client = createClient(initial_hosts);
+
+        client.setServersAndVerifyReconfig(new_hosts, true);
+
+        for (int i = 0; i < 10; i++)
+        {
+            string next = client.cycleNextServer();
+        }
+    }
+
+    /**
+     * Test the migration policy implicit within the probabilistic load balancing
+     * algorithm the Client implements. Tests all the corner cases whereby the
+     * list of servers is decreased, increased, and stays the same. Also combines
+     * various combinations of the currently connected server being in the new
+     * configuration and not.
+     */
+    void testMigrateOrNot()
+    {
+        const string initial_hosts = createHostList(4); // 2004..2001
+
+        Client &client = createClient(initial_hosts, "10.10.10.3");
+
+        // Ensemble size decreasing, my server is in the new list
+        client.setServersAndVerifyReconfig(createHostList(3), false);
+
+        // Ensemble size decreasing, my server is NOT in the new list
+        client.setServersAndVerifyReconfig(createHostList(2), true);
+
+        // Ensemble size stayed the same, my server is NOT in the new list
+        client.setServersAndVerifyReconfig(createHostList(2), true);
+
+        // Ensemble size increased, my server is not in the new ensemble
+        client.setServers(createHostList(4));
+        client.cycleUntilServer("10.10.10.1");
+        client.setServersAndVerifyReconfig(createHostList(7,2), true);
+    }
+
+    /**
+     * This tests that as a client is in reconfig mode it will properly try to
+     * connect to all the new servers first. Then it will try to connect to all
+     * the 'old' servers that are staying in the new configuration. Finally it
+     * will fallback to the normal behavior of trying servers in round-robin.
+     */
+    void testMigrationCycle()
+    {
+        int num_initial = 4;
+        const string initial_hosts = createHostList(num_initial); // {2004..2001}
+
+        int num_new = 10;
+        string new_hosts = createHostList(12, 3);      // {2012..2003}
+
+        // servers from the old list that appear in the new list {2004..2003}
+        int num_staying = 2;
+        string oldStaying = createHostList(4, 3);
+
+        // servers in the new list that are not in the old list  {2012..2005}
+        int num_coming = 8;
+        string newComing = createHostList(12, 5);
+
+        // Ensemble in increasing in size, my server is not in the new ensemble
+        // load on the old servers must be decreased, so must connect to one of
+        // new servers (pNew = 1)
+        Client &client = createClient(initial_hosts, "10.10.10.1");
+        client.setServersAndVerifyReconfig(new_hosts, true);
+
+        // Since we're in reconfig mode, next connect should be from new list
+        // We should try all the new servers *BEFORE* trying any old servers
+        string seen;
+        for (int i = 0; i < num_coming; i++) {
+            string next = client.cycleNextServer();
+
+            // Assert next server is in the 'new' list
+            size_t found = newComing.find(next);
+            CPPUNIT_ASSERT(found != string::npos);
+
+            // Assert not in seen list then append
+            found = seen.find(next);
+            CPPUNIT_ASSERT(found == string::npos);
+            seen += found + ", ";
+        }
+
+        // Now it should start connecting to the old servers
+        seen.clear();
+        for (int i = 0; i < num_staying; i++) {
+            string next = client.cycleNextServer();
+
+            // Assert it's in the old list
+            size_t found = oldStaying.find(next);
+            CPPUNIT_ASSERT(found != string::npos);
+
+            // Assert not in seen list then append
+            found = seen.find(next);
+            CPPUNIT_ASSERT(found == string::npos);
+            seen += found + ", ";
+        }
+
+        // NOW it goes back to normal as we've tried all the new and old
+        string first = client.cycleNextServer();
+        for (int i = 0; i < num_new - 1; i++) {
+            client.cycleNextServer();
+        }
+
+        CPPUNIT_ASSERT_EQUAL(first, client.cycleNextServer());
+    }
+
+    /**
+     * Test the migration probability to ensure that it conforms to our expected
+     * lower and upper bounds of the number of clients per server as we are 
+     * reconfigured.
+     * 
+     * In this case, the list of servers is increased and the client's server is
+     * in the new list. Whether to move or not depends on the difference of
+     * server sizes with probability 1 - |old|/|new| the client disconnects.
+     * 
+     * In the test below 1-9/10 = 1/10 chance of disconnecting
+     */
+    void testMigrateProbability()
+    {
+        const string initial_hosts = createHostList(9); // 10.10.10.9:2009...10.10.10.1:2001
+        string new_hosts = createHostList(10); // 10.10.10.10:2010...10.10.10.1:2001
+
+        uint32_t numDisconnects = 0;
+        for (int i = 0; i < numClients; i++) {
+            Client &client = createClient(initial_hosts, "10.10.10.3");
+            client.setServers(new_hosts);
+            if (client.isReconfig())
+            {
+                numDisconnects++;
+            }
+        }
+
+        // should be numClients/10 in expectation, we test that it's numClients/10 +- slackPercent
+        CPPUNIT_ASSERT(numDisconnects < upperboundClientsPerServer(numClients, 10));
+    }
+
+    /**
+     * Tests the probabilistic load balancing algorithm implemented by the Client
+     * code. 
+     * 
+     * Test strategy:
+     * 
+     * (1) Start with 9 servers and 10,000 clients. Remove a server, update
+     *     everything, and ensure that the clients are redistributed properly.
+     * 
+     * (2) Remove two more nodes and repeat the same validations of proper client
+     *     redistribution. Ensure no clients are connected to the two removed
+     *     nodes.
+     * 
+     * (3) Remove the first server in the list and simultaneously add the three
+     *     previously removed servers. Ensure everything is redistributed and
+     *     no clients are connected to the one missing node.
+     * 
+     * (4) Add the one missing server back into the mix and validate.
+     */
+    void testLoadBalancing()
+    {
+        zoo_deterministic_conn_order(0);
+
+        int rc = ZOK;
+
+        uint32_t numServers = 9;
+        const string initial_hosts = createHostList(numServers); // 10.10.10.9:2009...10.10.10.1:2001
+
+        // Create connections to servers
+        for (int i = 0; i < numClients; i++) {
+            Client &client = createClient(initial_hosts);
+            numClientsPerHost.at(client.getServerPort() - portOffset - 1)++;
+        }
+
+        for (int i = 0; i < numServers; i++) {
+            CPPUNIT_ASSERT(numClientsPerHost.at(i) <= upperboundClientsPerServer(numClients, numServers));
+            CPPUNIT_ASSERT(numClientsPerHost.at(i) >= lowerboundClientsPerServer(numClients, numServers));
+            numClientsPerHost.at(i) = 0; // prepare for next test
+        }
+
+        // remove last server
+        numServers = 8;
+        updateAllClientsAndServers(numServers);
+        CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(numServers));
+
+        // Remove two more nodes
+        numServers = 6;
+        updateAllClientsAndServers(numServers);
+        CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(numServers));
+        CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(numServers+1));
+        CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(numServers+2));
+
+        // remove host 0 (first one in list) and add back 6, 7, and 8
+        numServers = 8;
+        updateAllClientsAndServers(numServers, 1);
+        CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(0));
+
+        // add back host number 0
+        numServers = 9;
+        updateAllClientsAndServers(numServers);
+    }
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_reconfig);

Modified: zookeeper/trunk/src/c/tests/TestZookeeperClose.cc
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/TestZookeeperClose.cc?rev=1410731&r1=1410730&r2=1410731&view=diff
==============================================================================
--- zookeeper/trunk/src/c/tests/TestZookeeperClose.cc (original)
+++ zookeeper/trunk/src/c/tests/TestZookeeperClose.cc Sat Nov 17 14:03:03 2012
@@ -102,7 +102,7 @@ public: 
         // memory
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname));
-        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs));
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs.data));
         // This cannot be maintained properly CPPUNIT_ASSERT_EQUAL(9,freeMock.callCounter);
     }
     void testCloseUnconnected1()
@@ -128,7 +128,7 @@ public: 
         // memory
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname));
-        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs));
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs.data));
         // the close request sent?
         CPPUNIT_ASSERT_EQUAL(1,zkMock.counter);
     }
@@ -140,6 +140,7 @@ public: 
 
         zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
         CPPUNIT_ASSERT(zh!=0);
+        CPPUNIT_ASSERT_EQUAL(ZOO_NOTCONNECTED_STATE, zoo_state(zh));
 
         Mock_gettimeofday timeMock;
         
@@ -173,7 +174,7 @@ public: 
         // memory
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname));
-        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs));
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs.data));
         // the close request sent?
         CPPUNIT_ASSERT_EQUAL(1,(int)zkServer.closeSent);
     }
@@ -218,7 +219,7 @@ public: 
         // memory
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.hostname));
-        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.addrs));
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.addrs.data));
         // make sure the close request NOT sent
         CPPUNIT_ASSERT_EQUAL(0,(int)zkServer.closeSent);
     }
@@ -249,7 +250,7 @@ public: 
         // memory
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname));
-        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs));
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs.data));
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(adaptor));
         // Cannot be maintained accurately: CPPUNIT_ASSERT_EQUAL(10,freeMock.callCounter);
         // threads
@@ -313,7 +314,7 @@ public: 
             // memory
             CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
             CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname));
-            CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs));
+            CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs.data));
             CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(adaptor));
             // threads
             CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(adaptor->io));
@@ -378,7 +379,7 @@ public: 
             // memory
             CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh));
             CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.hostname));
-            CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.addrs));
+            CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.addrs.data));
             CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(adaptor));
             // threads
             CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(adaptor->io));
@@ -444,7 +445,7 @@ public: 
             // memory
             CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh));
             CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.hostname));
-            CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.addrs));
+            CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.addrs.data));
             CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(adaptor));
             // threads
             CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(adaptor->io));