You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2012/11/17 15:03:06 UTC
svn commit: r1410731 [1/2] - in /zookeeper/trunk: ./ src/c/ src/c/include/
src/c/src/ src/c/tests/ src/docs/src/documentation/content/xdocs/
src/java/main/org/apache/zookeeper/
src/java/main/org/apache/zookeeper/client/
src/java/test/org/apache/zookeep...
Author: fpj
Date: Sat Nov 17 14:03:03 2012
New Revision: 1410731
URL: http://svn.apache.org/viewvc?rev=1410731&view=rev
Log:
ZOOKEEPER-1355. Add zk.updateServerList(newServerList) (Alex Shraer, Marshall McMullen via fpj)
Added:
zookeeper/trunk/src/c/src/addrvec.c
zookeeper/trunk/src/c/src/addrvec.h
zookeeper/trunk/src/c/tests/TestReconfig.cc
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/c/Makefile.am
zookeeper/trunk/src/c/include/zookeeper.h
zookeeper/trunk/src/c/src/mt_adaptor.c
zookeeper/trunk/src/c/src/st_adaptor.c
zookeeper/trunk/src/c/src/zk_adaptor.h
zookeeper/trunk/src/c/src/zookeeper.c
zookeeper/trunk/src/c/tests/TestZookeeperClose.cc
zookeeper/trunk/src/c/tests/TestZookeeperInit.cc
zookeeper/trunk/src/c/tests/ZKMocks.cc
zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperProgrammers.xml
zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/client/HostProvider.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/client/StaticHostProvider.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StaticHostProviderTest.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1410731&r1=1410730&r2=1410731&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Sat Nov 17 14:03:03 2012
@@ -6,6 +6,9 @@ BUGFIXES:
Backward compatible changes:
+NEW FEATURES:
+ ZOOKEEPER-1355. Add zk.updateServerList(newServerList) (Alex Shraer, Marshall McMullen via fpj)
+
BUGFIXES:
ZOOKEEPER-786. Exception in ZooKeeper.toString
Modified: zookeeper/trunk/src/c/Makefile.am
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/Makefile.am?rev=1410731&r1=1410730&r2=1410731&view=diff
==============================================================================
--- zookeeper/trunk/src/c/Makefile.am (original)
+++ zookeeper/trunk/src/c/Makefile.am Sat Nov 17 14:03:03 2012
@@ -19,7 +19,8 @@ libhashtable_la_SOURCES = $(HASHTABLE_SR
COMMON_SRC = src/zookeeper.c include/zookeeper.h include/zookeeper_version.h include/zookeeper_log.h\
src/recordio.c include/recordio.h include/proto.h \
src/zk_adaptor.h generated/zookeeper.jute.c \
- src/zookeeper_log.h src/zk_log.c src/zk_hashtable.h src/zk_hashtable.c
+ src/zookeeper_log.h src/zk_log.c src/zk_hashtable.h src/zk_hashtable.c \
+ src/addrvec.h src/addrvec.c
# These are the symbols (classes, mostly) we want to export from our library.
EXPORT_SYMBOLS = '(zoo_|zookeeper_|zhandle|Z|format_log_message|log_message|logLevel|deallocate_|zerror|is_unrecoverable)'
@@ -70,13 +71,26 @@ endif
EXTRA_DIST+=$(wildcard ${srcdir}/tests/*.cc) $(wildcard ${srcdir}/tests/*.h) \
${srcdir}/tests/wrappers.opt ${srcdir}/tests/wrappers-mt.opt
-TEST_SOURCES = tests/TestDriver.cc tests/LibCMocks.cc tests/LibCSymTable.cc \
- tests/MocksBase.cc tests/ZKMocks.cc tests/Util.cc tests/ThreadingUtil.cc \
+# These tests are ordered in a logical manner such that each builds upon basic
+# functionality tested in prior tests. e.g. the most basic functionality is
+# tested in TestZookeeperInit and TestZookeeperClose and as such should be tested
+# first as a foundation with more complex test suites to follow.
+TEST_SOURCES = \
+ tests/TestDriver.cc \
+ tests/LibCMocks.cc \
+ tests/LibCSymTable.cc \
+ tests/MocksBase.cc \
+ tests/ZKMocks.cc \
+ tests/Util.cc \
+ tests/ThreadingUtil.cc \
+ tests/TestZookeeperInit.cc \
+ tests/TestZookeeperClose.cc \
+ tests/TestReconfig.cc \
tests/TestClientRetry.cc \
- tests/TestOperations.cc tests/TestZookeeperInit.cc \
- tests/TestZookeeperClose.cc tests/TestClient.cc \
- tests/TestMulti.cc tests/TestWatchers.cc
-
+ tests/TestOperations.cc \
+ tests/TestMulti.cc \
+ tests/TestClient.cc \
+ tests/TestWatchers.cc
SYMBOL_WRAPPERS=$(shell cat ${srcdir}/tests/wrappers.opt)
Modified: zookeeper/trunk/src/c/include/zookeeper.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/include/zookeeper.h?rev=1410731&r1=1410730&r2=1410731&view=diff
==============================================================================
--- zookeeper/trunk/src/c/include/zookeeper.h (original)
+++ zookeeper/trunk/src/c/include/zookeeper.h Sat Nov 17 14:03:03 2012
@@ -184,6 +184,7 @@ extern ZOOAPI const int ZOO_AUTH_FAILED_
extern ZOOAPI const int ZOO_CONNECTING_STATE;
extern ZOOAPI const int ZOO_ASSOCIATING_STATE;
extern ZOOAPI const int ZOO_CONNECTED_STATE;
+extern ZOOAPI const int ZOO_NOTCONNECTED_STATE;
// @}
/**
@@ -450,6 +451,63 @@ ZOOAPI zhandle_t *zookeeper_init(const c
int recv_timeout, const clientid_t *clientid, void *context, int flags);
/**
+ * \brief update the list of servers this client will connect to.
+ *
+ * This method allows a client to update the connection string by providing
+ * a new comma separated list of host:port pairs, each corresponding to a
+ * ZooKeeper server.
+ *
+ * This function invokes a probabilistic load-balancing algorithm which may cause
+ * the client to disconnect from its current host to achieve expected uniform
+ * connections per server in the new list. In case the current host to which the
+ * client is connected is not in the new list this call will always cause the
+ * connection to be dropped. Otherwise, the decision is based on whether the
+ * number of servers has increased or decreased and by how much.
+ *
+ * If the connection is dropped, the client moves to a special "reconfig" mode
+ * where he chooses a new server to connect to using the probabilistic algorithm.
+ * After finding a server or exhaustively trying all the servers in the new list,
+ * the client moves back to the normal mode of operation where it will pick an
+ * arbitrary server from the 'host' string.
+ *
+ * See {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1355} for the
+ * protocol and its evaluation,
+ *
+ * \param host comma separated host:port pairs, each corresponding to a zk
+ * server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZSYSTEMERROR -- a system (OS) error occured; it's worth checking errno to get details
+ */
+ZOOAPI int zoo_set_servers(zhandle_t *zh, const char *hosts);
+
+/**
+ * \brief cycle to the next server on the next connection attempt.
+ *
+ * Note: typically this method should NOT be used outside of testing.
+ *
+ * This method allows a client to cycle through the list of servers in it's
+ * connection pool to be used on the next connection attempt. This function does
+ * not actually trigger a connection or state change in any way. Its purpose is
+ * to allow testing changing servers on the fly and the probabilistic load
+ * balancing algorithm.
+ */
+ZOOAPI void zoo_cycle_next_server(zhandle_t *zh);
+
+/**
+ * \brief get current host:port this client is connecting/connected to.
+ *
+ * Note: typically this method should NOT be used outside of testing.
+ *
+ * This method allows a client to get the current host:port that this client
+ * is either in the process of connecting to or is currently connected to. This
+ * is mainly used for testing purposes but might also come in handy as a general
+ * purpose tool to be used by other clients.
+ */
+ZOOAPI const char* zoo_get_current_server(zhandle_t* zh);
+
+/**
* \brief close the zookeeper handle and free up any resources.
*
* After this call, the client session will no longer be valid. The function
Added: zookeeper/trunk/src/c/src/addrvec.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/addrvec.c?rev=1410731&view=auto
==============================================================================
--- zookeeper/trunk/src/c/src/addrvec.c (added)
+++ zookeeper/trunk/src/c/src/addrvec.c Sat Nov 17 14:03:03 2012
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <errno.h>
+
+#include "addrvec.h"
+
+#define ADDRVEC_DEFAULT_GROW_AMOUNT 16
+
+void addrvec_init(addrvec_t *avec)
+{
+ assert(avec);
+ avec->next = 0;
+ avec->count = 0;
+ avec->capacity = 0;
+ avec->data = NULL;
+}
+
+void addrvec_free(addrvec_t *avec)
+{
+ if (avec == NULL)
+ {
+ return;
+ }
+
+ avec->next = 0;
+ avec->count = 0;
+ avec->capacity = 0;
+ if (avec->data) {
+ free(avec->data);
+ avec->data = NULL;
+ }
+}
+
+int addrvec_alloc(addrvec_t *avec)
+{
+ addrvec_init(avec);
+ return addrvec_grow_default(avec);
+}
+
+int addrvec_alloc_capacity(addrvec_t* avec, uint32_t capacity)
+{
+ addrvec_init(avec);
+ return addrvec_grow(avec, capacity);
+}
+
+int addrvec_grow(addrvec_t *avec, uint32_t grow_amount)
+{
+ assert(avec);
+
+ if (grow_amount == 0)
+ {
+ return 0;
+ }
+
+ // Save off old data and capacity in case there is a realloc failure
+ unsigned int old_capacity = avec->capacity;
+ struct sockaddr_storage *old_data = avec->data;
+
+ avec->capacity += grow_amount;
+ avec->data = realloc(avec->data, sizeof(*avec->data) * avec->capacity);
+ if (avec->data == NULL)
+ {
+ avec->capacity = old_capacity;
+ avec->data = old_data;
+ errno = ENOMEM;
+ return 1;
+ }
+
+ return 0;
+}
+
+int addrvec_grow_default(addrvec_t *avec)
+{
+ return addrvec_grow(avec, ADDRVEC_DEFAULT_GROW_AMOUNT);
+}
+
+static int addrvec_grow_if_full(addrvec_t *avec)
+{
+ assert(avec);
+ if (avec->count == avec->capacity)
+ {
+ int rc = addrvec_grow_default(avec);
+ if (rc != 0)
+ {
+ return rc;
+ }
+ }
+
+ return 0;
+}
+
+int addrvec_contains(const addrvec_t *avec, const struct sockaddr_storage *addr)
+{
+ if (!avec || !addr)
+ {
+ return 0;
+ }
+
+ int i = 0;
+ for (i = 0; i < avec->count; i++)
+ {
+ if(memcmp(&avec->data[i], addr, INET_ADDRSTRLEN) == 0)
+ return 1;
+ }
+
+ return 0;
+}
+
+int addrvec_append(addrvec_t *avec, const struct sockaddr_storage *addr)
+{
+ assert(avec);
+ assert(addr);
+
+ int rc = addrvec_grow_if_full(avec);
+ if (rc != 0)
+ {
+ return rc;
+ }
+
+ // Copy addrinfo into address list
+ memcpy(avec->data + avec->count, addr, sizeof(*addr));
+ ++avec->count;
+
+ return 0;
+}
+
+int addrvec_append_addrinfo(addrvec_t *avec, const struct addrinfo *addrinfo)
+{
+ assert(avec);
+ assert(addrinfo);
+
+ int rc = addrvec_grow_if_full(avec);
+ if (rc != 0)
+ {
+ return rc;
+ }
+
+ // Copy addrinfo into address list
+ memcpy(avec->data + avec->count, addrinfo->ai_addr, addrinfo->ai_addrlen);
+ ++avec->count;
+
+ return 0;
+}
+
+void addrvec_shuffle(addrvec_t *avec)
+{
+ int i = 0;
+ for (i = avec->count - 1; i > 0; --i) {
+ long int j = random()%(i+1);
+ if (i != j) {
+ struct sockaddr_storage t = avec->data[i];
+ avec->data[i] = avec->data[j];
+ avec->data[j] = t;
+ }
+ }
+}
+
+int addrvec_hasnext(const addrvec_t *avec)
+{
+ return avec->count > 0 && (avec->next < avec->count);
+}
+
+int addrvec_atend(const addrvec_t *avec)
+{
+ return avec->count > 0 && avec->next >= avec->count;
+}
+
+void addrvec_next(addrvec_t *avec, struct sockaddr_storage *next)
+{
+ // If we're at the end of the list, then reset index to start
+ if (addrvec_atend(avec))
+ {
+ avec->next = 0;
+ }
+
+ if (!addrvec_hasnext(avec))
+ {
+ next = NULL;
+ return;
+ }
+
+ *next = avec->data[avec->next++];
+}
+
+int addrvec_eq(const addrvec_t *a1, const addrvec_t *a2)
+{
+ if (a1->count != a2->count)
+ {
+ return 0;
+ }
+
+ int i;
+ for (i = 0; i < a1->count; ++i)
+ {
+ if (!addrvec_contains(a2, &a1->data[i]))
+ return 0;
+ }
+
+ return 1;
+}
Added: zookeeper/trunk/src/c/src/addrvec.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/addrvec.h?rev=1410731&view=auto
==============================================================================
--- zookeeper/trunk/src/c/src/addrvec.h (added)
+++ zookeeper/trunk/src/c/src/addrvec.h Sat Nov 17 14:03:03 2012
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ADDRVEC_H_
+#define ADDRVEC_H_
+
+#include <inttypes.h>
+
+#ifndef WIN32
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#endif
+
+/**
+ * This structure represents a list of addresses. It stores the count of the
+ * number of elements that have been inserted via calls to addrvec_append and
+ * addrvec_append_addrinfo. It also has a capacity field for the number of
+ * addresses it has the ability to hold without needing to be enlarged.
+ */
+typedef struct _addrvec {
+ unsigned int next; // next index to use
+ unsigned int count; // number of addresses in this list
+ unsigned int capacity; // number of address this list can hold
+ struct sockaddr_storage *data; // list of addresses
+} addrvec_t;
+
+/**
+ * Initialize an addrvec by clearing out all its state.
+ */
+void addrvec_init(addrvec_t *avec);
+
+/**
+ * Free any memory used internally by an addrvec
+ */
+void addrvec_free(addrvec_t *avec);
+
+/**
+ * Allocate an addrvec with a default capacity (16)
+ */
+int addrvec_alloc(addrvec_t *avec);
+
+/**
+ * Allocates an addrvec with a specified capacity
+ */
+int addrvec_alloc_capacity(addrvec_t *avec, uint32_t capacity);
+
+/**
+ * Grow an addrvec by the specified amount. This will increase the capacity
+ * of the vector and not the contents.
+ */
+int addrvec_grow(addrvec_t *avec, uint32_t grow_amount);
+
+/**
+ * Similar to addrvec_grow but uses a default growth amount of 16.
+ */
+int addrvec_grow_default(addrvec_t *avec);
+
+/**
+ * Check if an addrvec contains the specificed sockaddr_storage value.
+ * \returns 1 if it contains the value and 0 otherwise.
+ */
+int addrvec_contains(const addrvec_t *avec, const struct sockaddr_storage *addr);
+
+/**
+ * Append the given sockaddr_storage pointer into the addrvec. The contents of
+ * the given 'addr' are copied into the addrvec via memcpy.
+ */
+int addrvec_append(addrvec_t *avec, const struct sockaddr_storage *addr);
+
+/**
+ * Append the given addrinfo pointer into the addrvec. The contents of the given
+ * 'addrinfo' are copied into the addrvec via memcpy.
+ */
+int addrvec_append_addrinfo(addrvec_t *avec, const struct addrinfo *addrinfo);
+
+/**
+ * Shuffle the addrvec so that it's internal list of addresses are randomized.
+ * Uses random() and assumes it has been properly seeded.
+ */
+void addrvec_shuffle(addrvec_t *avec);
+
+/**
+ * Determine if the addrvec has a next element (e.g. it's safe to call addrvec_next)
+ *
+ * \returns 1 if it has a next element and 0 otherwise
+ */
+int addrvec_hasnext(const addrvec_t *avec);
+
+/**
+ * Determine if the addrvec is at the end or not. Specifically, this means a
+ * subsequent call to addrvec_next will loop around to the start again.
+ */
+int addrvec_atend(const addrvec_t *avec);
+
+/**
+ * Get the next entry from the addrvec and update the associated index.
+ *
+ * If the current index points at (or after) the last element in the vector then
+ * it will loop back around and start at the beginning of the list.
+ */
+void addrvec_next(addrvec_t *avec, struct sockaddr_storage *next);
+
+/**
+ * Compare two addrvecs for equality.
+ *
+ * \returns 1 if the contents of the two lists are identical and and 0 otherwise.
+ */
+int addrvec_eq(const addrvec_t *a1, const addrvec_t *a2);
+
+#endif // ADDRVEC_H
+
+
+
Modified: zookeeper/trunk/src/c/src/mt_adaptor.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/mt_adaptor.c?rev=1410731&r1=1410730&r2=1410731&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/mt_adaptor.c (original)
+++ zookeeper/trunk/src/c/src/mt_adaptor.c Sat Nov 17 14:03:03 2012
@@ -255,6 +255,7 @@ int adaptor_init(zhandle_t *zh)
zh->adaptor_priv = adaptor_threads;
pthread_mutex_init(&zh->to_process.lock,0);
pthread_mutex_init(&adaptor_threads->zh_lock,0);
+ pthread_mutex_init(&adaptor_threads->reconfig_lock,0);
// to_send must be recursive mutex
pthread_mutexattr_init(&recursive_mx_attr);
pthread_mutexattr_settype(&recursive_mx_attr, PTHREAD_MUTEX_RECURSIVE);
@@ -515,6 +516,19 @@ __attribute__((constructor)) int32_t get
return fetch_and_add(&xid,1);
}
+void lock_reconfig(struct _zhandle *zh)
+{
+ struct adaptor_threads *adaptor = zh->adaptor_priv;
+ if(adaptor)
+ pthread_mutex_lock(&adaptor->reconfig_lock);
+}
+void unlock_reconfig(struct _zhandle *zh)
+{
+ struct adaptor_threads *adaptor = zh->adaptor_priv;
+ if(adaptor)
+ pthread_mutex_unlock(&adaptor->reconfig_lock);
+}
+
void enter_critical(zhandle_t* zh)
{
struct adaptor_threads *adaptor = zh->adaptor_priv;
Modified: zookeeper/trunk/src/c/src/st_adaptor.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/st_adaptor.c?rev=1410731&r1=1410730&r2=1410731&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/st_adaptor.c (original)
+++ zookeeper/trunk/src/c/src/st_adaptor.c Sat Nov 17 14:03:03 2012
@@ -95,5 +95,9 @@ int32_t get_xid()
}
return xid++;
}
+
+void lock_reconfig(struct _zhandle *zh){}
+void unlock_reconfig(struct _zhandle *zh){}
+
void enter_critical(zhandle_t* zh){}
void leave_critical(zhandle_t* zh){}
Modified: zookeeper/trunk/src/c/src/zk_adaptor.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/zk_adaptor.h?rev=1410731&r1=1410730&r2=1410731&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/zk_adaptor.h (original)
+++ zookeeper/trunk/src/c/src/zk_adaptor.h Sat Nov 17 14:03:03 2012
@@ -28,6 +28,7 @@
#endif
#include "zookeeper.h"
#include "zk_hashtable.h"
+#include "addrvec.h"
/* predefined xid's values recognized as special by the server */
#define WATCHER_EVENT_XID -1
@@ -156,10 +157,11 @@ struct prime_struct {
struct adaptor_threads {
pthread_t io;
pthread_t completion;
- int threadsToWait; // barrier
- pthread_cond_t cond; // barrier's conditional
- pthread_mutex_t lock; // ... and a lock
- pthread_mutex_t zh_lock; // critical section lock
+ int threadsToWait; // barrier
+ pthread_cond_t cond; // barrier's conditional
+ pthread_mutex_t lock; // ... and a lock
+ pthread_mutex_t zh_lock; // critical section lock
+ pthread_mutex_t reconfig_lock; // lock for reconfiguring cluster's ensemble
#ifdef WIN32
SOCKET self_pipe[2];
#else
@@ -179,52 +181,71 @@ typedef struct _auth_list_head {
/**
* This structure represents the connection to zookeeper.
*/
-
struct _zhandle {
#ifdef WIN32
- SOCKET fd; /* the descriptor used to talk to zookeeper */
+ SOCKET fd; // the descriptor used to talk to zookeeper
#else
- int fd; /* the descriptor used to talk to zookeeper */
+ int fd; // the descriptor used to talk to zookeeper
#endif
- char *hostname; /* the hostname of zookeeper */
- struct sockaddr_storage *addrs; /* the addresses that correspond to the hostname */
- int addrs_count; /* The number of addresses in the addrs array */
- watcher_fn watcher; /* the registered watcher */
- struct timeval last_recv; /* The time that the last message was received */
- struct timeval last_send; /* The time that the last message was sent */
- struct timeval last_ping; /* The time that the last PING was sent */
- struct timeval next_deadline; /* The time of the next deadline */
- int recv_timeout; /* The maximum amount of time that can go by without
- receiving anything from the zookeeper server */
- buffer_list_t *input_buffer; /* the current buffer being read in */
- buffer_head_t to_process; /* The buffers that have been read and are ready to be processed. */
- buffer_head_t to_send; /* The packets queued to send */
- completion_head_t sent_requests; /* The outstanding requests */
- completion_head_t completions_to_process; /* completions that are ready to run */
- int connect_index; /* The index of the address to connect to */
- clientid_t client_id;
- long long last_zxid;
- int outstanding_sync; /* Number of outstanding synchronous requests */
- struct _buffer_list primer_buffer; /* The buffer used for the handshake at the start of a connection */
- struct prime_struct primer_storage; /* the connect response */
- char primer_storage_buffer[40]; /* the true size of primer_storage */
- volatile int state;
- void *context;
- auth_list_head_t auth_h; /* authentication data list */
+
+ // Hostlist and list of addresses
+ char *hostname; // hostname contains list of zookeeper servers to connect to
+ struct sockaddr_storage addr_cur; // address of server we're currently connecting/connected to
+
+ addrvec_t addrs; // current list of addresses we're connected to
+ addrvec_t addrs_old; // old list of addresses that we are no longer connected to
+ addrvec_t addrs_new; // new list of addresses to connect to if we're reconfiguring
+
+ int reconfig; // Are we in the process of reconfiguring cluster's ensemble
+ double pOld, pNew; // Probability for selecting between 'addrs_old' and 'addrs_new'
+ int delay;
+
+ watcher_fn watcher; // the registered watcher
+
+ // Message timings
+ struct timeval last_recv; // time last message was received
+ struct timeval last_send; // time last message was sent
+ struct timeval last_ping; // time last PING was sent
+ struct timeval next_deadline; // time of the next deadline
+ int recv_timeout; // max receive timeout for messages from server
+
+ // Buffers
+ buffer_list_t *input_buffer; // current buffer being read in
+ buffer_head_t to_process; // buffers that have been read and ready to be processed
+ buffer_head_t to_send; // packets queued to send
+ completion_head_t sent_requests; // outstanding requests
+ completion_head_t completions_to_process; // completions that are ready to run
+ int outstanding_sync; // number of outstanding synchronous requests
+
+ // State info
+ volatile int state; // Current zookeeper state
+ void *context; // client-side provided context
+ clientid_t client_id; // client-id
+ long long last_zxid; // last zookeeper ID
+ auth_list_head_t auth_h; // authentication data list
+
+ // Primer storage
+ struct _buffer_list primer_buffer; // The buffer used for the handshake at the start of a connection
+ struct prime_struct primer_storage; // the connect response
+ char primer_storage_buffer[40]; // the true size of primer_storage
+
/* zookeeper_close is not reentrant because it de-allocates the zhandler.
* This guard variable is used to defer the destruction of zhandle till
* right before top-level API call returns to the caller */
int32_t ref_counter;
volatile int close_requested;
void *adaptor_priv;
+
/* Used for debugging only: non-zero value indicates the time when the zookeeper_process
* call returned while there was at least one unprocessed server response
* available in the socket recv buffer */
struct timeval socket_readable;
-
+
+ // Watchers
zk_hashtable* active_node_watchers;
zk_hashtable* active_exist_watchers;
zk_hashtable* active_child_watchers;
+
/** used for chroot path at the client side **/
char *chroot;
};
@@ -246,13 +267,19 @@ void free_duplicate_path(const char* fre
void zoo_lock_auth(zhandle_t *zh);
void zoo_unlock_auth(zhandle_t *zh);
+// ensemble reconfigure access guards
+void lock_reconfig(struct _zhandle *zh);
+void unlock_reconfig(struct _zhandle *zh);
+
// critical section guards
void enter_critical(zhandle_t* zh);
void leave_critical(zhandle_t* zh);
+
// zhandle object reference counting
void api_prolog(zhandle_t* zh);
int api_epilog(zhandle_t *zh, int rc);
int32_t get_xid();
+
// returns the new value of the ref counter
int32_t inc_ref_counter(zhandle_t* zh,int i);
Modified: zookeeper/trunk/src/c/src/zookeeper.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/zookeeper.c?rev=1410731&r1=1410730&r2=1410731&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/zookeeper.c (original)
+++ zookeeper/trunk/src/c/src/zookeeper.c Sat Nov 17 14:03:03 2012
@@ -74,6 +74,8 @@ const int ZOO_AUTH_FAILED_STATE = AUTH_F
const int ZOO_CONNECTING_STATE = CONNECTING_STATE_DEF;
const int ZOO_ASSOCIATING_STATE = ASSOCIATING_STATE_DEF;
const int ZOO_CONNECTED_STATE = CONNECTED_STATE_DEF;
+const int ZOO_NOTCONNECTED_STATE = NOTCONNECTED_STATE_DEF;
+
static __attribute__ ((unused)) const char* state2String(int state){
switch(state){
case 0:
@@ -176,7 +178,6 @@ typedef struct _completion_list {
const char*err2string(int err);
static int queue_session_event(zhandle_t *zh, int state);
static const char* format_endpoint_info(const struct sockaddr_storage* ep);
-static const char* format_current_endpoint_info(zhandle_t* zh);
/* deserialize forward declarations */
static void deserialize_response(int type, int xid, int failed, int rc, completion_list_t *cptr, struct iarchive *ia);
@@ -398,14 +399,12 @@ static void destroy(zhandle_t *zh)
if (zh->fd != -1) {
close(zh->fd);
zh->fd = -1;
+ memset(&zh->addr_cur, 0, sizeof(zh->addr_cur));
zh->state = 0;
}
- if (zh->addrs != 0) {
- free(zh->addrs);
- zh->addrs = NULL;
- }
+ addrvec_free(&zh->addrs);
- if (zh->chroot != 0) {
+ if (zh->chroot != NULL) {
free(zh->chroot);
zh->chroot = NULL;
}
@@ -429,6 +428,7 @@ static void setup_random()
close(fd);
}
srandom(seed);
+ srand48(seed);
#endif
}
@@ -456,31 +456,67 @@ static int getaddrinfo_errno(int rc) {
#endif
/**
- * fill in the addrs array of the zookeeper servers in the zhandle. after filling
- * them in, we will permute them for load balancing.
+ * Count the number of hosts in the connection host string. This assumes it's
+ * a well-formed connection string whereby each host is separated by a comma.
*/
-int getaddrs(zhandle_t *zh)
+static int count_hosts(char *hosts)
{
- char *hosts = strdup(zh->hostname);
- char *host;
- char *strtok_last;
- struct sockaddr_storage *addr;
- int i;
- int rc;
- int alen = 0; /* the allocated length of the addrs array */
+ if (!hosts || strlen(hosts) == 0) {
+ return 0;
+ }
+
+ uint32_t count = 0;
+ char *loc = hosts;
+
+ while ((loc = strchr(loc, ','))) {
+ count++;
+ loc+=1;
+ }
+
+ return count+1;
+}
+
+/**
+ * Resolve hosts and populate provided address vector with shuffled results.
+ * The contents of the provided address vector will be initialized to an
+ * empty state.
+ */
+int resolve_hosts(const char *hosts_in, addrvec_t *avec)
+{
+ int rc = ZOK;
- zh->addrs_count = 0;
- if (zh->addrs) {
- free(zh->addrs);
- zh->addrs = 0;
+ if (hosts_in == NULL || avec == NULL) {
+ return ZBADARGUMENTS;
}
- if (!hosts) {
- LOG_ERROR(("out of memory"));
+
+ // initialize address vector
+ addrvec_init(avec);
+
+ char *hosts = strdup(hosts_in);
+ if (hosts == NULL) {
+ LOG_ERROR(("out of memory"));
errno=ENOMEM;
- return ZSYSTEMERROR;
+ rc=ZSYSTEMERROR;
+ goto fail;
}
- zh->addrs = 0;
- host=strtok_r(hosts, ",", &strtok_last);
+
+ int num_hosts = count_hosts(hosts);
+ if (num_hosts == 0) {
+ free(hosts);
+ return ZOK;
+ }
+
+ // Allocate list inside avec
+ rc = addrvec_alloc_capacity(avec, num_hosts);
+ if (rc != 0) {
+ LOG_ERROR(("out of memory"));
+ errno=ENOMEM;
+ rc=ZSYSTEMERROR;
+ goto fail;
+ }
+
+ char *strtok_last;
+ char * host = strtok_r(hosts, ",", &strtok_last);
while(host) {
char *port_spec = strrchr(host, ':');
char *end_port_spec;
@@ -516,26 +552,25 @@ int getaddrs(zhandle_t *zh)
goto fail;
}
- /* Setup the address array */
+ // Setup the address array
for(ptr = he->h_addr_list;*ptr != 0; ptr++) {
- if (zh->addrs_count == alen) {
- alen += 16;
- zh->addrs = realloc(zh->addrs, sizeof(*zh->addrs)*alen);
- if (zh->addrs == 0) {
+ if (addrs->count == addrs->capacity) {
+ rc = addrvec_grow_default(addrs);
+ if (rc != 0) {
LOG_ERROR(("out of memory"));
errno=ENOMEM;
rc=ZSYSTEMERROR;
goto fail;
}
}
- addr = &zh->addrs[zh->addrs_count];
+ addr = &addrs->list[addrs->count];
addr4 = (struct sockaddr_in*)addr;
addr->ss_family = he->h_addrtype;
if (addr->ss_family == AF_INET) {
addr4->sin_port = htons(port);
memset(&addr4->sin_zero, 0, sizeof(addr4->sin_zero));
memcpy(&addr4->sin_addr, *ptr, he->h_length);
- zh->addrs_count++;
+ zh->addrs.count++;
}
#if defined(AF_INET6)
else if (addr->ss_family == AF_INET6) {
@@ -546,12 +581,12 @@ int getaddrs(zhandle_t *zh)
addr6->sin6_scope_id = 0;
addr6->sin6_flowinfo = 0;
memcpy(&addr6->sin6_addr, *ptr, he->h_length);
- zh->addrs_count++;
+ zh->addrs.count++;
}
#endif
else {
LOG_WARN(("skipping unknown address family %x for %s",
- addr->ss_family, zh->hostname));
+ addr->ss_family, hosts_in));
}
}
host = strtok_r(0, ",", &strtok_last);
@@ -607,32 +642,27 @@ int getaddrs(zhandle_t *zh)
for (res = res0; res; res = res->ai_next) {
// Expand address list if needed
- if (zh->addrs_count == alen) {
- void *tmpaddr;
- alen += 16;
- tmpaddr = realloc(zh->addrs, sizeof(*zh->addrs)*alen);
- if (tmpaddr == 0) {
+ if (avec->count == avec->capacity) {
+ rc = addrvec_grow_default(avec);
+ if (rc != 0) {
LOG_ERROR(("out of memory"));
errno=ENOMEM;
rc=ZSYSTEMERROR;
goto fail;
}
- zh->addrs=tmpaddr;
}
// Copy addrinfo into address list
- addr = &zh->addrs[zh->addrs_count];
switch (res->ai_family) {
case AF_INET:
#if defined(AF_INET6)
case AF_INET6:
#endif
- memcpy(addr, res->ai_addr, res->ai_addrlen);
- ++zh->addrs_count;
+ addrvec_append_addrinfo(avec, res);
break;
default:
LOG_WARN(("skipping unknown address family %x for %s",
- res->ai_family, zh->hostname));
+ res->ai_family, hosts_in));
break;
}
}
@@ -647,25 +677,179 @@ int getaddrs(zhandle_t *zh)
if(!disable_conn_permute){
setup_random();
- /* Permute */
- for (i = zh->addrs_count - 1; i > 0; --i) {
- long int j = random()%(i+1);
- if (i != j) {
- struct sockaddr_storage t = zh->addrs[i];
- zh->addrs[i] = zh->addrs[j];
- zh->addrs[j] = t;
+ addrvec_shuffle(avec);
+ }
+
+ return ZOK;
+
+fail:
+ addrvec_free(avec);
+
+ if (hosts) {
+ free(hosts);
+ hosts = NULL;
+ }
+
+ return rc;
+}
+
+/**
+ * Updates the list of servers and determine if changing connections is necessary.
+ * Permutes server list for proper load balancing.
+ *
+ * Changing connections is necessary if one of the following holds:
+ * a) the server this client is currently connected is not in new address list.
+ * Otherwise (if currentHost is in the new list):
+ * b) the number of servers in the cluster is increasing - in this case the load
+ * on currentHost should decrease, which means that SOME of the clients
+ * connected to it will migrate to the new servers. The decision whether this
+ * client migrates or not is probabilistic so that the expected number of
+ * clients connected to each server is the same.
+ *
+ * If reconfig is set to true, the function sets pOld and pNew that correspond
+ * to the probability to migrate to ones of the new servers or one of the old
+ * servers (migrating to one of the old servers is done only if our client's
+ * currentHost is not in new list).
+ *
+ * See zoo_cycle_next_server for the selection logic.
+ *
+ * See {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1355} for the
+ * protocol and its evaluation,
+ */
+int update_addrs(zhandle_t *zh)
+{
+ // Verify we have a valid handle
+ if (zh == NULL) {
+ return ZBADARGUMENTS;
+ }
+
+ // zh->hostname should always be set
+ if (zh->hostname == NULL)
+ {
+ return ZSYSTEMERROR;
+ }
+
+ // NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new}
+ lock_reconfig(zh);
+
+ int rc = ZOK;
+ char *hosts = NULL;
+
+ // Copy zh->hostname for local use
+ hosts = strdup(zh->hostname);
+ if (hosts == NULL) {
+ rc = ZSYSTEMERROR;
+ goto fail;
+ }
+
+ addrvec_t resolved = { 0 };
+ rc = resolve_hosts(hosts, &resolved);
+ if (rc != ZOK)
+ {
+ goto fail;
+ }
+
+ // If the addrvec list is identical to last time we ran don't do anything
+ if (addrvec_eq(&zh->addrs, &resolved))
+ {
+ goto fail;
+ }
+
+ // Is the server we're connected to in the new resolved list?
+ int found_current = addrvec_contains(&resolved, &zh->addr_cur);
+
+ // Clear out old and new address lists
+ zh->reconfig = 1;
+ addrvec_free(&zh->addrs_old);
+ addrvec_free(&zh->addrs_new);
+
+ // Divide server list into addrs_old if in previous list and addrs_new if not
+ int i = 0;
+ for (i = 0; i < resolved.count; i++)
+ {
+ struct sockaddr_storage *resolved_address = &resolved.data[i];
+ if (addrvec_contains(&zh->addrs, resolved_address))
+ {
+ rc = addrvec_append(&zh->addrs_old, resolved_address);
+ if (rc != ZOK)
+ {
+ goto fail;
+ }
+ }
+ else {
+ rc = addrvec_append(&zh->addrs_new, resolved_address);
+ if (rc != ZOK)
+ {
+ goto fail;
}
}
}
- return ZOK;
+
+ int num_old = zh->addrs_old.count;
+ int num_new = zh->addrs_new.count;
+
+ // Number of servers increased
+ if (num_old + num_new > zh->addrs.count)
+ {
+ if (found_current) {
+ // my server is in the new config, but load should be decreased.
+ // Need to decide if the client is moving to one of the new servers
+ if (drand48() <= (1 - ((double)zh->addrs.count) / (num_old + num_new))) {
+ zh->pNew = 1;
+ zh->pOld = 0;
+ } else {
+ // do nothing special -- stay with the current server
+ zh->reconfig = 0;
+ }
+ } else {
+ // my server is not in the new config, and load on old servers must
+ // be decreased, so connect to one of the new servers
+ zh->pNew = 1;
+ zh->pOld = 0;
+ }
+ }
+
+ // Number of servers stayed the same or decreased
+ else {
+ if (found_current) {
+ // my server is in the new config, and load should be increased, so
+ // stay with this server and do nothing special
+ zh->reconfig = 0;
+ } else {
+ zh->pOld = ((double) (num_old * (zh->addrs.count - (num_old + num_new)))) / ((num_old + num_new) * (zh->addrs.count - num_old));
+ zh->pNew = 1 - zh->pOld;
+ }
+ }
+
+ addrvec_free(&zh->addrs);
+ zh->addrs = resolved;
+
+ // If we need to do a reconfig and we're currently connected to a server,
+ // then force close that connection so on next interest() call we'll make a
+ // new connection
+ if (zh->reconfig == 1 && zh->fd != -1)
+ {
+ close(zh->fd);
+ zh->fd = -1;
+ zh->state = ZOO_NOTCONNECTED_STATE;
+ }
+
fail:
- if (zh->addrs) {
- free(zh->addrs);
- zh->addrs=0;
+
+ unlock_reconfig(zh);
+
+ // If we short-circuited out and never assigned resolved to zh->addrs then we
+ // need to free resolved to avoid a memleak.
+ if (zh->addrs.data != resolved.data)
+ {
+ addrvec_free(&resolved);
}
+
if (hosts) {
free(hosts);
+ hosts = NULL;
}
+
return rc;
}
@@ -791,8 +975,9 @@ zhandle_t *zookeeper_init(const char *ho
if (!zh) {
return 0;
}
+ zh->hostname = NULL;
zh->fd = -1;
- zh->state = NOTCONNECTED_STATE_DEF;
+ zh->state = ZOO_NOTCONNECTED_STATE;
zh->context = context;
zh->recv_timeout = recv_timeout;
init_auth_info(&zh->auth_h);
@@ -805,8 +990,7 @@ zhandle_t *zookeeper_init(const char *ho
errno=EINVAL;
goto abort;
}
- //parse the host to get the chroot if
- //available
+ //parse the host to get the chroot if available
index_chroot = strchr(host, '/');
if (index_chroot) {
zh->chroot = strdup(index_chroot);
@@ -835,10 +1019,9 @@ zhandle_t *zookeeper_init(const char *ho
if (zh->hostname == 0) {
goto abort;
}
- if(getaddrs(zh)!=0) {
+ if(update_addrs(zh) != 0) {
goto abort;
}
- zh->connect_index = 0;
if (clientid) {
memcpy(&zh->client_id, clientid, sizeof(zh->client_id));
} else {
@@ -869,6 +1052,134 @@ abort:
}
/**
+ * Set a new list of zk servers to connect to. Disconnect will occur if
+ * current connection endpoint is not in the list.
+ */
+int zoo_set_servers(zhandle_t *zh, const char *hosts)
+{
+ if (hosts == NULL)
+ {
+ LOG_ERROR(("New server list cannot be empty"));
+ return ZBADARGUMENTS;
+ }
+
+ // NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new}
+ lock_reconfig(zh);
+
+ // Reset hostname to new set of hosts to connect to
+ if (zh->hostname) {
+ free(zh->hostname);
+ }
+
+ zh->hostname = strdup(hosts);
+
+ unlock_reconfig(zh);
+
+ return update_addrs(zh);
+}
+
+/**
+ * Get the next server to connect to, when in 'reconfig' mode, which means that
+ * we've updated the server list to connect to, and are now trying to find some
+ * server to connect to. Once we get successfully connected, 'reconfig' mode is
+ * set to false. Similarly, if we tried to connect to all servers in new config
+ * and failed, 'reconfig' mode is set to false.
+ *
+ * While in 'reconfig' mode, we should connect to a server in the new set of
+ * servers (addrs_new) with probability pNew and to servers in the old set of
+ * servers (addrs_old) with probability pOld (which is just 1-pNew). If we tried
+ * out all servers in either, we continue to try servers from the other set,
+ * regardless of pNew or pOld. If we tried all servers we give up and go back to
+ * the normal round robin mode
+ *
+ * When called, must be protected by lock_reconfig(zh).
+ */
+static int get_next_server_in_reconfig(zhandle_t *zh)
+{
+ int take_new = drand48() <= zh->pNew;
+
+ LOG_DEBUG(("[OLD] count=%d capacity=%d next=%d hasnext=%d",
+ zh->addrs_old.count, zh->addrs_old.capacity, zh->addrs_old.next,
+ addrvec_hasnext(&zh->addrs_old)));
+ LOG_DEBUG(("[NEW] count=%d capacity=%d next=%d hasnext=%d",
+ zh->addrs_new.count, zh->addrs_new.capacity, zh->addrs_new.next,
+ addrvec_hasnext(&zh->addrs_new)));
+
+ // Take one of the new servers if we haven't tried them all yet
+ // and either the probability tells us to connect to one of the new servers
+ // or if we already tried them all then use one of the old servers
+ if (addrvec_hasnext(&zh->addrs_new)
+ && (take_new || !addrvec_hasnext(&zh->addrs_old)))
+ {
+ addrvec_next(&zh->addrs_new, &zh->addr_cur);
+ LOG_DEBUG(("Using next from NEW=%s", format_endpoint_info(&zh->addr_cur)));
+ return 0;
+ }
+
+ // start taking old servers
+ if (addrvec_hasnext(&zh->addrs_old)) {
+ addrvec_next(&zh->addrs_old, &zh->addr_cur);
+ LOG_DEBUG(("Using next from OLD=%s", format_endpoint_info(&zh->addr_cur)));
+ return 0;
+ }
+
+ LOG_DEBUG(("Failed to find either new or old"));
+ memset(&zh->addr_cur, 0, sizeof(zh->addr_cur));
+ return 1;
+}
+
+/**
+ * Cycle through our server list to the correct 'next' server. The 'next' server
+ * to connect to depends upon whether we're in a 'reconfig' mode or not. Reconfig
+ * mode means we've upated the server list and are now trying to find a server
+ * to connect to. Once we get connected, we are no longer in the reconfig mode.
+ * Similarly, if we try to connect to all the servers in the new configuration
+ * and failed, reconfig mode is set to false.
+ *
+ * For more algorithm details, see get_next_server_in_reconfig.
+ */
+void zoo_cycle_next_server(zhandle_t *zh)
+{
+ // NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new}
+ lock_reconfig(zh);
+
+ memset(&zh->addr_cur, 0, sizeof(zh->addr_cur));
+
+ if (zh->reconfig)
+ {
+ if (get_next_server_in_reconfig(zh) == 0) {
+ unlock_reconfig(zh);
+ return;
+ }
+
+ // tried all new and old servers and couldn't connect
+ zh->reconfig = 0;
+ }
+
+ addrvec_next(&zh->addrs, &zh->addr_cur);
+
+ unlock_reconfig(zh);
+
+ return;
+}
+
+/**
+ * Get the host:port for the server we are currently connecting to or connected
+ * to. This is largely for testing purposes but is also generally useful for
+ * other client software built on top of this client.
+ */
+const char* zoo_get_current_server(zhandle_t* zh)
+{
+ // NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new}
+ // Need the lock here as it is changed in update_addrs()
+ lock_reconfig(zh);
+
+ const char * endpoint_info = format_endpoint_info(&zh->addr_cur);
+ unlock_reconfig(zh);
+ return endpoint_info;
+}
+
+/**
* deallocated the free_path only its beeen allocated
* and not equal to path
*/
@@ -1233,7 +1544,15 @@ static void handle_error(zhandle_t *zh,i
}
cleanup_bufs(zh,1,rc);
zh->fd = -1;
- zh->connect_index++;
+
+ LOG_DEBUG(("Previous connection=[%s] delay=%d", zoo_get_current_server(zh), zh->delay));
+
+ // NOTE: If we're at the end of the list of addresses to connect to, then
+ // we want to delay the next connection attempt to avoid spinning.
+ // Then increment what host we'll connect to since we failed to connect to current
+ zh->delay = addrvec_atend(&zh->addrs);
+ addrvec_next(&zh->addrs, &zh->addr_cur);
+
if (!is_unrecoverable(zh)) {
zh->state = 0;
}
@@ -1252,7 +1571,7 @@ static int handle_socket_error_msg(zhand
vsnprintf(buf, sizeof(buf)-1,format,va);
log_message(ZOO_LOG_LEVEL_ERROR,line,__func__,
format_log_message("Socket [%s] zk retcode=%d, errno=%d(%s): %s",
- format_current_endpoint_info(zh),rc,errno,strerror(errno),buf));
+ zoo_get_current_server(zh),rc,errno,strerror(errno),buf));
va_end(va);
}
handle_error(zh,rc);
@@ -1335,7 +1654,7 @@ static int send_auth_info(zhandle_t *zh)
auth = auth->next;
}
zoo_unlock_auth(zh);
- LOG_DEBUG(("Sending all auth info request to %s", format_current_endpoint_info(zh)));
+ LOG_DEBUG(("Sending all auth info request to %s", zoo_get_current_server(zh)));
return (rc <0) ? ZMARSHALLINGERROR:ZOK;
}
@@ -1352,7 +1671,7 @@ static int send_last_auth_info(zhandle_t
}
rc = send_info_packet(zh, auth);
zoo_unlock_auth(zh);
- LOG_DEBUG(("Sending auth info request to %s",format_current_endpoint_info(zh)));
+ LOG_DEBUG(("Sending auth info request to %s",zoo_get_current_server(zh)));
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
}
@@ -1399,7 +1718,7 @@ static int send_set_watches(zhandle_t *z
free_key_list(req.dataWatches.data, req.dataWatches.count);
free_key_list(req.existWatches.data, req.existWatches.count);
free_key_list(req.childWatches.data, req.childWatches.count);
- LOG_DEBUG(("Sending set watches request to %s",format_current_endpoint_info(zh)));
+ LOG_DEBUG(("Sending set watches request to %s",zoo_get_current_server(zh)));
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
}
@@ -1540,7 +1859,6 @@ static struct timeval get_timeval(int in
int zookeeper_interest(zhandle_t *zh, SOCKET *fd, int *interest,
struct timeval *tv)
{
-
ULONG nonblocking_flag = 1;
#else
int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
@@ -1561,16 +1879,39 @@ int zookeeper_interest(zhandle_t *zh, in
LOG_WARN(("Exceeded deadline by %dms", time_left));
}
api_prolog(zh);
+
+ int rc = update_addrs(zh);
+ if (rc != ZOK) {
+ return api_epilog(zh, rc);
+ }
+
*fd = zh->fd;
*interest = 0;
tv->tv_sec = 0;
tv->tv_usec = 0;
+
if (*fd == -1) {
- if (zh->connect_index == zh->addrs_count) {
- /* Wait a bit before trying again so that we don't spin */
- zh->connect_index = 0;
- }else {
- int rc;
+
+ /*
+ * If we previously failed to connect to server pool (zh->delay == 1)
+ * then we need delay our connection on this iteration 1/60 of the
+ * recv timeout before trying again so we don't spin.
+ *
+ * We always clear the delay setting. If we fail again, we'll set delay
+ * again and on the next iteration we'll do the same.
+ */
+ if (zh->delay == 1) {
+ *tv = get_timeval(zh->recv_timeout/60);
+ zh->delay = 0;
+
+ LOG_WARN(("Delaying connection after exhaustively trying all servers [%s]",
+ zh->hostname));
+ }
+
+ // No need to delay -- grab the next server and attempt connection
+ else {
+ zoo_cycle_next_server(zh);
+
#ifdef WIN32
char enable_tcp_nodelay = 1;
#else
@@ -1578,7 +1919,7 @@ int zookeeper_interest(zhandle_t *zh, in
#endif
int ssoresult;
- zh->fd = socket(zh->addrs[zh->connect_index].ss_family, SOCK_STREAM, 0);
+ zh->fd = socket(zh->addr_cur.ss_family, SOCK_STREAM, 0);
if (zh->fd < 0) {
return api_epilog(zh,handle_socket_error_msg(zh,__LINE__,
ZSYSTEMERROR, "socket() call failed"));
@@ -1593,41 +1934,44 @@ int zookeeper_interest(zhandle_t *zh, in
fcntl(zh->fd, F_SETFL, O_NONBLOCK|fcntl(zh->fd, F_GETFL, 0));
#endif
#if defined(AF_INET6)
- if (zh->addrs[zh->connect_index].ss_family == AF_INET6) {
- rc = connect(zh->fd, (struct sockaddr*) &zh->addrs[zh->connect_index], sizeof(struct sockaddr_in6));
+ if (zh->addr_cur.ss_family == AF_INET6) {
+ rc = connect(zh->fd, (struct sockaddr*)&zh->addr_cur, sizeof(struct sockaddr_in6));
} else {
#else
LOG_DEBUG(("[zk] connect()\n"));
{
#endif
- rc = connect(zh->fd, (struct sockaddr*) &zh->addrs[zh->connect_index], sizeof(struct sockaddr_in));
+ rc = connect(zh->fd, (struct sockaddr*)&zh->addr_cur, sizeof(struct sockaddr_in));
#ifdef WIN32
get_errno();
#endif
}
if (rc == -1) {
+
/* we are handling the non-blocking connect according to
* the description in section 16.3 "Non-blocking connect"
* in UNIX Network Programming vol 1, 3rd edition */
if (errno == EWOULDBLOCK || errno == EINPROGRESS)
zh->state = ZOO_CONNECTING_STATE;
else
+ {
return api_epilog(zh,handle_socket_error_msg(zh,__LINE__,
ZCONNECTIONLOSS,"connect() call failed"));
+ }
} else {
if((rc=prime_connection(zh))!=0)
return api_epilog(zh,rc);
- LOG_INFO(("Initiated connection to server [%s]",
- format_endpoint_info(&zh->addrs[zh->connect_index])));
+ LOG_INFO(("Initiated connection to server [%s]", format_endpoint_info(&zh->addr_cur)));
}
+ *tv = get_timeval(zh->recv_timeout/3);
}
*fd = zh->fd;
- *tv = get_timeval(zh->recv_timeout/3);
zh->last_recv = now;
zh->last_send = now;
zh->last_ping = now;
}
+
if (zh->fd != -1) {
int idle_recv = calculate_interval(&zh->last_recv, &now);
int idle_send = calculate_interval(&zh->last_send, &now);
@@ -1646,7 +1990,7 @@ int zookeeper_interest(zhandle_t *zh, in
return api_epilog(zh,handle_socket_error_msg(zh,
__LINE__,ZOPERATIONTIMEOUT,
"connection to %s timed out (exceeded timeout by %dms)",
- format_endpoint_info(&zh->addrs[zh->connect_index]),
+ format_endpoint_info(&zh->addr_cur),
-recv_to));
}
@@ -1656,8 +2000,8 @@ int zookeeper_interest(zhandle_t *zh, in
send_to = zh->recv_timeout/3 - idle_send;
if (send_to <= 0 && zh->sent_requests.head==0) {
// LOG_DEBUG(("Sending PING to %s (exceeded idle by %dms)",
-// format_current_endpoint_info(zh),-send_to));
- int rc=send_ping(zh);
+// zoo_get_current_server(zh),-send_to));
+ rc = send_ping(zh);
if (rc < 0){
LOG_ERROR(("failed to send PING request (zk retcode=%d)",rc));
return api_epilog(zh,rc);
@@ -1677,7 +2021,7 @@ int zookeeper_interest(zhandle_t *zh, in
/* we are interested in a write if we are connected and have something
* to send, or we are waiting for a connect to finish. */
if ((zh->to_send.head && (zh->state == ZOO_CONNECTED_STATE))
- || zh->state == ZOO_CONNECTING_STATE) {
+ || zh->state == ZOO_CONNECTING_STATE) {
*interest |= ZOOKEEPER_WRITE;
}
}
@@ -1701,10 +2045,11 @@ static int check_events(zhandle_t *zh, i
return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
"server refused to accept the client");
}
+
if((rc=prime_connection(zh))!=0)
return rc;
- LOG_INFO(("initiated connection to server [%s]",
- format_endpoint_info(&zh->addrs[zh->connect_index])));
+
+ LOG_INFO(("initiated connection to server [%s]", format_endpoint_info(&zh->addr_cur)));
return ZOK;
}
if (zh->to_send.head && (events&ZOOKEEPER_WRITE)) {
@@ -1745,12 +2090,13 @@ static int check_events(zhandle_t *zh, i
} else {
zh->recv_timeout = zh->primer_storage.timeOut;
zh->client_id.client_id = newid;
-
+
memcpy(zh->client_id.passwd, &zh->primer_storage.passwd,
sizeof(zh->client_id.passwd));
zh->state = ZOO_CONNECTED_STATE;
+ zh->reconfig = 0;
LOG_INFO(("session establishment complete on server [%s], sessionId=%#llx, negotiated timeout=%d",
- format_endpoint_info(&zh->addrs[zh->connect_index]),
+ format_endpoint_info(&zh->addr_cur),
newid, zh->recv_timeout));
/* we want the auth to be sent for, but since both call push to front
we need to call send_watch_set first */
@@ -1866,7 +2212,7 @@ static void process_sync_completion(
completion_list_t *cptr,
struct sync_completion *sc,
struct iarchive *ia,
- zhandle_t *zh)
+ zhandle_t *zh)
{
LOG_DEBUG(("Processing sync_completion with type=%d xid=%#x rc=%d",
cptr->c.type, cptr->xid, sc->rc));
@@ -2274,9 +2620,9 @@ int zookeeper_process(zhandle_t *zh, int
struct sync_completion
*sc = (struct sync_completion*)cptr->data;
sc->rc = rc;
-
+
process_sync_completion(cptr, sc, ia, zh);
-
+
notify_sync_completion(sc);
free_buffer(bptr);
zh->outstanding_sync--;
@@ -2359,7 +2705,7 @@ static completion_list_t* create_complet
}
c->xid = xid;
c->watcher = wo;
-
+
return c;
}
@@ -2489,7 +2835,7 @@ int zookeeper_close(zhandle_t *zh)
* completions from calling zookeeper_close before we have
* completed the adaptor_finish call below. */
- /* Signal any syncronous completions before joining the threads */
+ /* Signal any syncronous completions before joining the threads */
enter_critical(zh);
free_completions(zh,1,ZCLOSING);
leave_critical(zh);
@@ -2506,7 +2852,7 @@ int zookeeper_close(zhandle_t *zh)
struct oarchive *oa;
struct RequestHeader h = { STRUCT_INITIALIZER (xid , get_xid()), STRUCT_INITIALIZER (type , ZOO_CLOSE_OP)};
LOG_INFO(("Closing zookeeper sessionId=%#llx to [%s]\n",
- zh->client_id.client_id,format_current_endpoint_info(zh)));
+ zh->client_id.client_id,zoo_get_current_server(zh)));
oa = create_buffer_oarchive();
rc = serialize_RequestHeader(oa, "header", &h);
rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
@@ -2589,7 +2935,7 @@ static int Request_path_init(zhandle_t *
char **path_out, const char *path)
{
assert(path_out);
-
+
*path_out = prepend_string(zh, path);
if (zh == NULL || !isValidPath(*path_out, flags)) {
free_duplicate_path(*path_out, path);
@@ -2656,7 +3002,7 @@ int zoo_awget(zhandle_t *zh, const char
close_buffer_oarchive(&oa, 0);
LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
- format_current_endpoint_info(zh)));
+ zoo_get_current_server(zh)));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -2701,7 +3047,7 @@ int zoo_aset(zhandle_t *zh, const char *
close_buffer_oarchive(&oa, 0);
LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
- format_current_endpoint_info(zh)));
+ zoo_get_current_server(zh)));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -2757,7 +3103,7 @@ int zoo_acreate(zhandle_t *zh, const cha
close_buffer_oarchive(&oa, 0);
LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
- format_current_endpoint_info(zh)));
+ zoo_get_current_server(zh)));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -2797,7 +3143,7 @@ int zoo_adelete(zhandle_t *zh, const cha
close_buffer_oarchive(&oa, 0);
LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
- format_current_endpoint_info(zh)));
+ zoo_get_current_server(zh)));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -2836,7 +3182,7 @@ int zoo_awexists(zhandle_t *zh, const ch
close_buffer_oarchive(&oa, 0);
LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
- format_current_endpoint_info(zh)));
+ zoo_get_current_server(zh)));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -2869,7 +3215,7 @@ static int zoo_awget_children_(zhandle_t
close_buffer_oarchive(&oa, 0);
LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
- format_current_endpoint_info(zh)));
+ zoo_get_current_server(zh)));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -2917,7 +3263,7 @@ static int zoo_awget_children2_(zhandle_
close_buffer_oarchive(&oa, 0);
LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
- format_current_endpoint_info(zh)));
+ zoo_get_current_server(zh)));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -2960,7 +3306,7 @@ int zoo_async(zhandle_t *zh, const char
close_buffer_oarchive(&oa, 0);
LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
- format_current_endpoint_info(zh)));
+ zoo_get_current_server(zh)));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -2990,7 +3336,7 @@ int zoo_aget_acl(zhandle_t *zh, const ch
close_buffer_oarchive(&oa, 0);
LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
- format_current_endpoint_info(zh)));
+ zoo_get_current_server(zh)));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3021,7 +3367,7 @@ int zoo_aset_acl(zhandle_t *zh, const ch
close_buffer_oarchive(&oa, 0);
LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
- format_current_endpoint_info(zh)));
+ zoo_get_current_server(zh)));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
@@ -3033,16 +3379,16 @@ static void op_result_string_completion(
struct zoo_op_result *result = (struct zoo_op_result *)data;
assert(result);
result->err = err;
-
+
if (result->value && value) {
int len = strlen(value) + 1;
- if (len > result->valuelen) {
- len = result->valuelen;
- }
- if (len > 0) {
- memcpy(result->value, value, len - 1);
- result->value[len - 1] = '\0';
- }
+ if (len > result->valuelen) {
+ len = result->valuelen;
+ }
+ if (len > 0) {
+ memcpy(result->value, value, len - 1);
+ result->value[len - 1] = '\0';
+ }
} else {
result->value = NULL;
}
@@ -3100,7 +3446,7 @@ int zoo_amulti(zhandle_t *zh, int count,
struct MultiHeader mh = { STRUCT_INITIALIZER(type, op->type), STRUCT_INITIALIZER(done, 0), STRUCT_INITIALIZER(err, -1) };
rc = rc < 0 ? rc : serialize_MultiHeader(oa, "multiheader", &mh);
-
+
switch(op->type) {
case ZOO_CREATE_OP: {
struct CreateRequest req;
@@ -3111,7 +3457,7 @@ int zoo_amulti(zhandle_t *zh, int count,
op->create_op.flags);
rc = rc < 0 ? rc : serialize_CreateRequest(oa, "req", &req);
result->value = op->create_op.buf;
- result->valuelen = op->create_op.buflen;
+ result->valuelen = op->create_op.buflen;
enter_critical(zh);
entry = create_completion_entry(h.xid, COMPLETION_STRING, op_result_string_completion, result, 0, 0);
@@ -3169,19 +3515,19 @@ int zoo_amulti(zhandle_t *zh, int count,
}
rc = rc < 0 ? rc : serialize_MultiHeader(oa, "multiheader", &mh);
-
+
/* BEGIN: CRTICIAL SECTION */
enter_critical(zh);
rc = rc < 0 ? rc : add_multi_completion(zh, h.xid, completion, data, &clist);
rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
get_buffer_len(oa));
leave_critical(zh);
-
+
/* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
LOG_DEBUG(("Sending multi request xid=%#x with %d subrequests to %s",
- h.xid, index, format_current_endpoint_info(zh)));
+ h.xid, index, zoo_get_current_server(zh)));
/* make a best (non-blocking) effort to send the requests asap */
adaptor_send_queue(zh, 0);
@@ -3234,12 +3580,12 @@ void zoo_check_op_init(zoo_op_t *op, con
int zoo_multi(zhandle_t *zh, int count, const zoo_op_t *ops, zoo_op_result_t *results)
{
int rc;
-
+
struct sync_completion *sc = alloc_sync_completion();
if (!sc) {
return ZSYSTEMERROR;
}
-
+
rc = zoo_amulti(zh, count, ops, results, SYNCHRONOUS_MARKER, sc);
if (rc == ZOK) {
wait_sync_completion(sc);
@@ -3422,8 +3768,8 @@ int zoo_add_auth(zhandle_t *zh,const cha
static const char* format_endpoint_info(const struct sockaddr_storage* ep)
{
- static char buf[128];
- char addrstr[128];
+ static char buf[128] = { 0 };
+ char addrstr[128] = { 0 };
void *inaddr;
#ifdef WIN32
char * addrstring;
@@ -3453,11 +3799,6 @@ static const char* format_endpoint_info(
return buf;
}
-static const char* format_current_endpoint_info(zhandle_t* zh)
-{
- return format_endpoint_info(&zh->addrs[zh->connect_index]);
-}
-
void zoo_deterministic_conn_order(int yesOrNo)
{
disable_conn_permute=yesOrNo;
Added: zookeeper/trunk/src/c/tests/TestReconfig.cc
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/TestReconfig.cc?rev=1410731&view=auto
==============================================================================
--- zookeeper/trunk/src/c/tests/TestReconfig.cc (added)
+++ zookeeper/trunk/src/c/tests/TestReconfig.cc Sat Nov 17 14:03:03 2012
@@ -0,0 +1,598 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cppunit/extensions/HelperMacros.h>
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <errno.h>
+#include <iostream>
+#include <sstream>
+#include <arpa/inet.h>
+#include <exception>
+#include <stdlib.h>
+
+#include "Util.h"
+#include "LibCMocks.h"
+#include "ZKMocks.h"
+
+using namespace std;
+
+static const int portOffset = 2000;
+
+class Client
+{
+
+private:
+ // Member variables
+ zhandle_t *zh;
+ unsigned int seed;
+
+public:
+ /**
+ * Create a client with given connection host string and add to our internal
+ * vector of clients. These are disconnected and cleaned up in tearDown().
+ */
+ Client(const string hosts, unsigned int seed) :
+ seed((seed * seed) + 0xAFAFAFAF)
+ {
+ reSeed();
+
+ zh = zookeeper_init(hosts.c_str(),0,1000,0,0,0);
+ CPPUNIT_ASSERT(zh);
+
+ reSeed();
+
+ cycleNextServer();
+ }
+
+ void close()
+ {
+ zookeeper_close(zh);
+ zh = NULL;
+ }
+
+ bool isReconfig()
+ {
+ return zh->reconfig != 0;
+ }
+
+ /**
+ * re-seed this client with it's own previously generated seed so its
+ * random choices are unique and separate from the other clients
+ */
+ void reSeed()
+ {
+ srandom(seed);
+ srand48(seed);
+ }
+
+ /**
+ * Get the server that this client is currently connected to.
+ */
+ string getServer()
+ {
+ const char* addrstring = zoo_get_current_server(zh);
+ return string(addrstring);
+ }
+
+ /**
+ * Get the server this client is currently connected to with no port
+ * specification.
+ */
+ string getServerNoPort()
+ {
+ string addrstring = getServer();
+
+ size_t found = addrstring.find(":");
+ CPPUNIT_ASSERT(found != string::npos);
+
+ return addrstring.substr(0, found);
+ }
+
+ /**
+ * Get the port of the server this client is currently connected to.
+ */
+ uint32_t getServerPort()
+ {
+ string addrstring = getServer();
+
+ size_t found = addrstring.find(":");
+ CPPUNIT_ASSERT(found != string::npos);
+
+ string portStr = addrstring.substr(found+1);
+
+ stringstream ss(portStr);
+ uint32_t port;
+ ss >> port;
+
+ CPPUNIT_ASSERT(port >= portOffset);
+
+ return port;
+ }
+
+ /**
+ * Cycle to the next available server on the next connect attempt. It also
+ * calls into getServer (above) to return the server connected to.
+ */
+ string cycleNextServer()
+ {
+ zoo_cycle_next_server(zh);
+ return getServer();
+ }
+
+ void cycleUntilServer(const string requested)
+ {
+ // Call cycleNextServer until the one it's connected to is the one
+ // specified (disregarding port).
+ string first;
+
+ while(true)
+ {
+ string next = cycleNextServer();
+ if (first.empty())
+ {
+ first = next;
+ }
+ // Else we've looped around!
+ else if (first == next)
+ {
+ CPPUNIT_ASSERT(false);
+ }
+
+ // Strip port off
+ string server = getServerNoPort();
+
+ // If it matches the requested host we're now 'connected' to the right host
+ if (server == requested)
+ {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Set servers for this client.
+ */
+ void setServers(const string new_hosts)
+ {
+ int rc = zoo_set_servers(zh, new_hosts.c_str());
+ CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
+ }
+
+ /**
+ * Set servers for this client and validate reconfig value matches expected.
+ */
+ void setServersAndVerifyReconfig(const string new_hosts, bool is_reconfig)
+ {
+ setServers(new_hosts);
+ CPPUNIT_ASSERT_EQUAL(is_reconfig, isReconfig());
+ }
+
+ /**
+ * Sets the server list this client is connecting to AND if this requires
+ * the client to be reconfigured (as dictated by internal client policy)
+ * then it will trigger a call to cycleNextServer.
+ */
+ void setServersAndCycleIfNeeded(const string new_hosts)
+ {
+ setServers(new_hosts);
+ if (isReconfig())
+ {
+ cycleNextServer();
+ }
+ }
+};
+
+class Zookeeper_reconfig : public CPPUNIT_NS::TestFixture
+{
+ CPPUNIT_TEST_SUITE(Zookeeper_reconfig);
+
+ // Test cases
+ CPPUNIT_TEST(testcycleNextServer);
+ CPPUNIT_TEST(testMigrateOrNot);
+ CPPUNIT_TEST(testMigrationCycle);
+
+ // In threaded mode each 'create' is a thread -- it's not practical to create
+ // 10,000 threads to test load balancing. The load balancing code can easily
+ // be tested in single threaded mode as concurrency doesn't affect the algorithm.
+#ifndef THREADED
+ CPPUNIT_TEST(testMigrateProbability);
+ CPPUNIT_TEST(testLoadBalancing);
+#endif
+
+ CPPUNIT_TEST_SUITE_END();
+
+ FILE *logfile;
+
+ double slackPercent;
+ static const int numClients = 10000;
+ static const int portOffset = 2000;
+
+ vector<Client> clients;
+ vector<uint32_t> numClientsPerHost;
+
+public:
+ Zookeeper_reconfig() :
+ slackPercent(10.0)
+ {
+ logfile = openlogfile("Zookeeper_reconfig");
+ }
+
+ ~Zookeeper_reconfig()
+ {
+ if (logfile)
+ {
+ fflush(logfile);
+ fclose(logfile);
+ logfile = 0;
+ }
+ }
+
+ void setUp()
+ {
+ zoo_set_log_stream(logfile);
+ zoo_deterministic_conn_order(1);
+
+ numClientsPerHost.resize(numClients);
+ }
+
+ void tearDown()
+ {
+ for (int i = 0; i < clients.size(); i++)
+ {
+ clients.at(i).close();
+ }
+ }
+
+ /**
+ * Create a client with given connection host string and add to our internal
+ * vector of clients. These are disconnected and cleaned up in tearDown().
+ */
+ Client& createClient(const string hosts)
+ {
+ Client client(hosts, clients.size());
+ clients.push_back(client);
+
+ return clients.back();
+ }
+
+ /**
+ * Same as createClient(hosts) only it takes a specific host that this client
+ * should simulate being connected to.
+ */
+ Client& createClient(const string hosts, const string host)
+ {
+ // Ensure requested host is in the list
+ size_t found = hosts.find(host);
+ CPPUNIT_ASSERT(found != hosts.npos);
+
+ Client client(hosts, clients.size());
+ client.cycleUntilServer(host);
+ clients.push_back(client);
+
+ return clients.back();
+ }
+
+ /**
+ * Create a connection host list starting at 'start' and stopping at 'stop'
+ * where start >= stop. This creates a connection string with host:port pairs
+ * separated by commas. The given 'octet' is the starting octet that is used
+ * as the last octet in the host's IP. This is decremented on each iteration.
+ * Each port will be portOffset + octet.
+ */
+ string createHostList(uint32_t start, uint32_t stop = 1, uint32_t octet = 0)
+ {
+ if (octet == 0)
+ {
+ octet = start;
+ }
+
+ stringstream ss;
+
+ for (int i = start; i >= stop; i--, octet--)
+ {
+ ss << "10.10.10." << octet << ":" << portOffset + octet;
+
+ if (i > stop)
+ {
+ ss << ", ";
+ }
+ }
+
+ return ss.str();
+ }
+
+ /**
+ * Gets the lower bound of the number of clients per server that we expect
+ * based on the probabilistic load balancing algorithm implemented by the
+ * client code.
+ */
+ double lowerboundClientsPerServer(int numClients, int numServers)
+ {
+ return (1 - slackPercent/100.0) * numClients / numServers;
+ }
+
+ /**
+ * Gets the upper bound of the number of clients per server that we expect
+ * based on the probabilistic load balancing algorithm implemented by the
+ * client code.
+ */
+ double upperboundClientsPerServer(int numClients, int numServers)
+ {
+ return (1 + slackPercent/100.0) * numClients / numServers;
+ }
+
+ /**
+ * Update all the clients to use a new list of servers. This will also cause
+ * the client to cycle to the next server as needed (e.g. due to a reconfig).
+ * It then updates the number of clients connected to the server based on
+ * this change.
+ *
+ * Afterwards it validates that all of the servers have the correct amount of
+ * clients based on the probabilistic load balancing algorithm.
+ */
+ void updateAllClientsAndServers(int start, int stop = 1)
+ {
+ string newServers = createHostList(start, stop);
+ int numServers = start - stop + 1;
+
+ for (int i = 0; i < numClients; i++) {
+
+ Client &client = clients.at(i);
+ client.reSeed();
+
+ client.setServersAndCycleIfNeeded(newServers);
+ numClientsPerHost.at(client.getServerPort() - portOffset - 1)++;
+ }
+
+ int offset = stop - 1;
+ for (int index = offset; index < numServers; index++) {
+
+ if (numClientsPerHost.at(index) > upperboundClientsPerServer(numClients, numServers))
+ {
+ cout << "INDEX=" << index << " too many -- actual=" << numClientsPerHost.at(index)
+ << " expected=" << upperboundClientsPerServer(numClients, numServers) << endl;
+ }
+
+
+ CPPUNIT_ASSERT(numClientsPerHost.at(index) <= upperboundClientsPerServer(numClients, numServers));
+
+ if (numClientsPerHost.at(index) < lowerboundClientsPerServer(numClients, numServers))
+ {
+ cout << "INDEX=" << index << " too few -- actual=" << numClientsPerHost.at(index)
+ << " expected=" << lowerboundClientsPerServer(numClients, numServers) << endl;
+ }
+
+ CPPUNIT_ASSERT(numClientsPerHost.at(index) >= lowerboundClientsPerServer(numClients, numServers));
+ numClientsPerHost.at(index) = 0; // prepare for next test
+ }
+ }
+
+ /*-------------------------------------------------------------------------*
+ * TESTCASES
+ *------------------------------------------------------------------------*/
+
+ /**
+ * Very basic sunny day test to ensure basic functionality of zoo_set_servers
+ * and zoo_cycle_next_server.
+ */
+ void testcycleNextServer()
+ {
+ const string initial_hosts = createHostList(10); // 2010..2001
+ const string new_hosts = createHostList(4); // 2004..2001
+
+ Client &client = createClient(initial_hosts);
+
+ client.setServersAndVerifyReconfig(new_hosts, true);
+
+ for (int i = 0; i < 10; i++)
+ {
+ string next = client.cycleNextServer();
+ }
+ }
+
+ /**
+ * Test the migration policy implicit within the probabilistic load balancing
+ * algorithm the Client implements. Tests all the corner cases whereby the
+ * list of servers is decreased, increased, and stays the same. Also combines
+ * various combinations of the currently connected server being in the new
+ * configuration and not.
+ */
+ void testMigrateOrNot()
+ {
+ const string initial_hosts = createHostList(4); // 2004..2001
+
+ Client &client = createClient(initial_hosts, "10.10.10.3");
+
+ // Ensemble size decreasing, my server is in the new list
+ client.setServersAndVerifyReconfig(createHostList(3), false);
+
+ // Ensemble size decreasing, my server is NOT in the new list
+ client.setServersAndVerifyReconfig(createHostList(2), true);
+
+ // Ensemble size stayed the same, my server is NOT in the new list
+ client.setServersAndVerifyReconfig(createHostList(2), true);
+
+ // Ensemble size increased, my server is not in the new ensemble
+ client.setServers(createHostList(4));
+ client.cycleUntilServer("10.10.10.1");
+ client.setServersAndVerifyReconfig(createHostList(7,2), true);
+ }
+
+ /**
+ * This tests that as a client is in reconfig mode it will properly try to
+ * connect to all the new servers first. Then it will try to connect to all
+ * the 'old' servers that are staying in the new configuration. Finally it
+ * will fallback to the normal behavior of trying servers in round-robin.
+ */
+ void testMigrationCycle()
+ {
+ int num_initial = 4;
+ const string initial_hosts = createHostList(num_initial); // {2004..2001}
+
+ int num_new = 10;
+ string new_hosts = createHostList(12, 3); // {2012..2003}
+
+ // servers from the old list that appear in the new list {2004..2003}
+ int num_staying = 2;
+ string oldStaying = createHostList(4, 3);
+
+ // servers in the new list that are not in the old list {2012..2005}
+ int num_coming = 8;
+ string newComing = createHostList(12, 5);
+
+ // Ensemble in increasing in size, my server is not in the new ensemble
+ // load on the old servers must be decreased, so must connect to one of
+ // new servers (pNew = 1)
+ Client &client = createClient(initial_hosts, "10.10.10.1");
+ client.setServersAndVerifyReconfig(new_hosts, true);
+
+ // Since we're in reconfig mode, next connect should be from new list
+ // We should try all the new servers *BEFORE* trying any old servers
+ string seen;
+ for (int i = 0; i < num_coming; i++) {
+ string next = client.cycleNextServer();
+
+ // Assert next server is in the 'new' list
+ size_t found = newComing.find(next);
+ CPPUNIT_ASSERT(found != string::npos);
+
+ // Assert not in seen list then append
+ found = seen.find(next);
+ CPPUNIT_ASSERT(found == string::npos);
+ seen += found + ", ";
+ }
+
+ // Now it should start connecting to the old servers
+ seen.clear();
+ for (int i = 0; i < num_staying; i++) {
+ string next = client.cycleNextServer();
+
+ // Assert it's in the old list
+ size_t found = oldStaying.find(next);
+ CPPUNIT_ASSERT(found != string::npos);
+
+ // Assert not in seen list then append
+ found = seen.find(next);
+ CPPUNIT_ASSERT(found == string::npos);
+ seen += found + ", ";
+ }
+
+ // NOW it goes back to normal as we've tried all the new and old
+ string first = client.cycleNextServer();
+ for (int i = 0; i < num_new - 1; i++) {
+ client.cycleNextServer();
+ }
+
+ CPPUNIT_ASSERT_EQUAL(first, client.cycleNextServer());
+ }
+
+ /**
+ * Test the migration probability to ensure that it conforms to our expected
+ * lower and upper bounds of the number of clients per server as we are
+ * reconfigured.
+ *
+ * In this case, the list of servers is increased and the client's server is
+ * in the new list. Whether to move or not depends on the difference of
+ * server sizes with probability 1 - |old|/|new| the client disconnects.
+ *
+ * In the test below 1-9/10 = 1/10 chance of disconnecting
+ */
+ void testMigrateProbability()
+ {
+ const string initial_hosts = createHostList(9); // 10.10.10.9:2009...10.10.10.1:2001
+ string new_hosts = createHostList(10); // 10.10.10.10:2010...10.10.10.1:2001
+
+ uint32_t numDisconnects = 0;
+ for (int i = 0; i < numClients; i++) {
+ Client &client = createClient(initial_hosts, "10.10.10.3");
+ client.setServers(new_hosts);
+ if (client.isReconfig())
+ {
+ numDisconnects++;
+ }
+ }
+
+ // should be numClients/10 in expectation, we test that it's numClients/10 +- slackPercent
+ CPPUNIT_ASSERT(numDisconnects < upperboundClientsPerServer(numClients, 10));
+ }
+
+ /**
+ * Tests the probabilistic load balancing algorithm implemented by the Client
+ * code.
+ *
+ * Test strategy:
+ *
+ * (1) Start with 9 servers and 10,000 clients. Remove a server, update
+ * everything, and ensure that the clients are redistributed properly.
+ *
+ * (2) Remove two more nodes and repeat the same validations of proper client
+ * redistribution. Ensure no clients are connected to the two removed
+ * nodes.
+ *
+ * (3) Remove the first server in the list and simultaneously add the three
+ * previously removed servers. Ensure everything is redistributed and
+ * no clients are connected to the one missing node.
+ *
+ * (4) Add the one missing server back into the mix and validate.
+ */
+ void testLoadBalancing()
+ {
+ zoo_deterministic_conn_order(0);
+
+ int rc = ZOK;
+
+ uint32_t numServers = 9;
+ const string initial_hosts = createHostList(numServers); // 10.10.10.9:2009...10.10.10.1:2001
+
+ // Create connections to servers
+ for (int i = 0; i < numClients; i++) {
+ Client &client = createClient(initial_hosts);
+ numClientsPerHost.at(client.getServerPort() - portOffset - 1)++;
+ }
+
+ for (int i = 0; i < numServers; i++) {
+ CPPUNIT_ASSERT(numClientsPerHost.at(i) <= upperboundClientsPerServer(numClients, numServers));
+ CPPUNIT_ASSERT(numClientsPerHost.at(i) >= lowerboundClientsPerServer(numClients, numServers));
+ numClientsPerHost.at(i) = 0; // prepare for next test
+ }
+
+ // remove last server
+ numServers = 8;
+ updateAllClientsAndServers(numServers);
+ CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(numServers));
+
+ // Remove two more nodes
+ numServers = 6;
+ updateAllClientsAndServers(numServers);
+ CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(numServers));
+ CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(numServers+1));
+ CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(numServers+2));
+
+ // remove host 0 (first one in list) and add back 6, 7, and 8
+ numServers = 8;
+ updateAllClientsAndServers(numServers, 1);
+ CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(0));
+
+ // add back host number 0
+ numServers = 9;
+ updateAllClientsAndServers(numServers);
+ }
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_reconfig);
Modified: zookeeper/trunk/src/c/tests/TestZookeeperClose.cc
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/TestZookeeperClose.cc?rev=1410731&r1=1410730&r2=1410731&view=diff
==============================================================================
--- zookeeper/trunk/src/c/tests/TestZookeeperClose.cc (original)
+++ zookeeper/trunk/src/c/tests/TestZookeeperClose.cc Sat Nov 17 14:03:03 2012
@@ -102,7 +102,7 @@ public:
// memory
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname));
- CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs));
+ CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs.data));
// This cannot be maintained properly CPPUNIT_ASSERT_EQUAL(9,freeMock.callCounter);
}
void testCloseUnconnected1()
@@ -128,7 +128,7 @@ public:
// memory
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname));
- CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs));
+ CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs.data));
// the close request sent?
CPPUNIT_ASSERT_EQUAL(1,zkMock.counter);
}
@@ -140,6 +140,7 @@ public:
zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
CPPUNIT_ASSERT(zh!=0);
+ CPPUNIT_ASSERT_EQUAL(ZOO_NOTCONNECTED_STATE, zoo_state(zh));
Mock_gettimeofday timeMock;
@@ -173,7 +174,7 @@ public:
// memory
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname));
- CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs));
+ CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs.data));
// the close request sent?
CPPUNIT_ASSERT_EQUAL(1,(int)zkServer.closeSent);
}
@@ -218,7 +219,7 @@ public:
// memory
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.hostname));
- CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.addrs));
+ CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.addrs.data));
// make sure the close request NOT sent
CPPUNIT_ASSERT_EQUAL(0,(int)zkServer.closeSent);
}
@@ -249,7 +250,7 @@ public:
// memory
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname));
- CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs));
+ CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs.data));
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(adaptor));
// Cannot be maintained accurately: CPPUNIT_ASSERT_EQUAL(10,freeMock.callCounter);
// threads
@@ -313,7 +314,7 @@ public:
// memory
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname));
- CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs));
+ CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs.data));
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(adaptor));
// threads
CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(adaptor->io));
@@ -378,7 +379,7 @@ public:
// memory
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh));
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.hostname));
- CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.addrs));
+ CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.addrs.data));
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(adaptor));
// threads
CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(adaptor->io));
@@ -444,7 +445,7 @@ public:
// memory
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh));
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.hostname));
- CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.addrs));
+ CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.addrs.data));
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(adaptor));
// threads
CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(adaptor->io));