You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2014/07/07 23:43:45 UTC
svn commit: r1608620 - in /zookeeper/trunk: ./ src/c/ src/c/include/
src/c/src/ src/c/tests/
Author: fpj
Date: Mon Jul 7 21:43:44 2014
New Revision: 1608620
URL: http://svn.apache.org/r1608620
Log:
ZOOKEEPER-827. enable r/o mode in C client library (rgs via fpj)
Added:
zookeeper/trunk/src/c/tests/TestReadOnlyClient.cc
zookeeper/trunk/src/c/tests/WatchUtil.h
zookeeper/trunk/src/c/tests/quorum.cfg
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/c/Makefile.am
zookeeper/trunk/src/c/README
zookeeper/trunk/src/c/include/zookeeper.h
zookeeper/trunk/src/c/src/addrvec.c
zookeeper/trunk/src/c/src/addrvec.h
zookeeper/trunk/src/c/src/cli.c
zookeeper/trunk/src/c/src/load_gen.c
zookeeper/trunk/src/c/src/zk_adaptor.h
zookeeper/trunk/src/c/src/zookeeper.c
zookeeper/trunk/src/c/tests/TestClientRetry.cc
zookeeper/trunk/src/c/tests/ZKMocks.cc
zookeeper/trunk/src/c/tests/ZKMocks.h
zookeeper/trunk/src/c/tests/zkServer.sh
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Mon Jul 7 21:43:44 2014
@@ -32,6 +32,8 @@ NEW FEATURES:
ZOOKEEPER-1928. add configurable throttling to the number of snapshots
concurrently sent by a leader (Edward Carter via fpj)
+ ZOOKEEPER-827. enable r/o mode in C client library (rgs via fpj)
+
BUGFIXES:
ZOOKEEPER-1900. NullPointerException in truncate (Camille Fournier)
Modified: zookeeper/trunk/src/c/Makefile.am
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/Makefile.am?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/Makefile.am (original)
+++ zookeeper/trunk/src/c/Makefile.am Mon Jul 7 21:43:44 2014
@@ -93,7 +93,9 @@ TEST_SOURCES = \
tests/TestWatchers.cc \
tests/TestClient.cc \
tests/ZooKeeperQuorumServer.cc \
- tests/ZooKeeperQuorumServer.h
+ tests/ZooKeeperQuorumServer.h \
+ tests/TestReadOnlyClient.cc \
+ $(NULL)
SYMBOL_WRAPPERS=$(shell cat ${srcdir}/tests/wrappers.opt)
Modified: zookeeper/trunk/src/c/README
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/README?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/README (original)
+++ zookeeper/trunk/src/c/README Mon Jul 7 21:43:44 2014
@@ -85,6 +85,10 @@ against zookeeper_st library):
$ cli_mt zookeeper_host:9876
+To start a client with read-only mode enabled, use the -r flag:
+
+$ cli_mt -r zookeeper_host:9876
+
This is a client application that gives you a shell for executing
simple zookeeper commands. Once successfully started and connected to
the server it displays a shell prompt.
Modified: zookeeper/trunk/src/c/include/zookeeper.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/include/zookeeper.h?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/include/zookeeper.h (original)
+++ zookeeper/trunk/src/c/include/zookeeper.h Mon Jul 7 21:43:44 2014
@@ -121,8 +121,10 @@ enum ZOO_ERRORS {
ZCLOSING = -116, /*!< ZooKeeper is closing */
ZNOTHING = -117, /*!< (not error) no server responses to process */
ZSESSIONMOVED = -118, /*!<session moved to another server, so operation is ignored */
+ ZNOTREADONLY = -119, /*!< state-changing request is passed to read-only server */
ZEPHEMERALONLOCALSESSION = -120, /*!< Attempt to create ephemeral node on a local session */
- ZNOWATCHER = -121 /*!< The watcher couldn't be found */
+ ZNOWATCHER = -121, /*!< The watcher couldn't be found */
+ ZRWSERVERFOUND = -122 /*!< r/w server found while in r/o mode */
};
#ifdef __cplusplus
@@ -147,6 +149,9 @@ extern ZOOAPI const int ZOO_PERM_ALL;
#define ZOO_CONFIG_NODE "/zookeeper/config"
+/* flags for zookeeper_init{,2} */
+#define ZOO_READONLY 1
+
/** This Id represents anyone. */
extern ZOOAPI struct Id ZOO_ANYONE_ID_UNSAFE;
/** This Id is only usable to set ACLs. It will get substituted with the
@@ -196,6 +201,7 @@ extern ZOOAPI const int ZOO_AUTH_FAILED_
extern ZOOAPI const int ZOO_CONNECTING_STATE;
extern ZOOAPI const int ZOO_ASSOCIATING_STATE;
extern ZOOAPI const int ZOO_CONNECTED_STATE;
+extern ZOOAPI const int ZOO_READONLY_STATE;
extern ZOOAPI const int ZOO_NOTCONNECTED_STATE;
// @}
Modified: zookeeper/trunk/src/c/src/addrvec.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/addrvec.c?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/addrvec.c (original)
+++ zookeeper/trunk/src/c/src/addrvec.c Mon Jul 7 21:43:44 2014
@@ -194,21 +194,45 @@ int addrvec_atend(const addrvec_t *avec)
void addrvec_next(addrvec_t *avec, struct sockaddr_storage *next)
{
+ int index;
+
// If we're at the end of the list, then reset index to start
- if (addrvec_atend(avec))
- {
+ if (addrvec_atend(avec)) {
avec->next = 0;
}
- if (!addrvec_hasnext(avec))
- {
+ if (!addrvec_hasnext(avec)) {
+ if (next) {
+ memset(next, 0, sizeof(*next));
+ }
+
+ return;
+ }
+
+ index = avec->next++;
+
+ if (next) {
+ *next = avec->data[index];
+ }
+}
+
+void addrvec_peek(addrvec_t *avec, struct sockaddr_storage *next)
+{
+ int index = avec->next;
+
+ if (avec->count == 0) {
memset(next, 0, sizeof(*next));
return;
}
- *next = avec->data[avec->next++];
+ if (addrvec_atend(avec)) {
+ index = 0;
+ }
+
+ *next = avec->data[index];
}
+
int addrvec_eq(const addrvec_t *a1, const addrvec_t *a2)
{
uint32_t i = 0;
Modified: zookeeper/trunk/src/c/src/addrvec.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/addrvec.h?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/addrvec.h (original)
+++ zookeeper/trunk/src/c/src/addrvec.h Mon Jul 7 21:43:44 2014
@@ -111,7 +111,9 @@ int addrvec_hasnext(const addrvec_t *ave
int addrvec_atend(const addrvec_t *avec);
/**
- * Get the next entry from the addrvec and update the associated index.
+ * Get the next entry from the addrvec and update the associated index.
+ *
+ * If next is NULL, the index will still be updated.
*
* If the current index points at (or after) the last element in the vector then
* it will loop back around and start at the beginning of the list.
@@ -119,6 +121,11 @@ int addrvec_atend(const addrvec_t *avec)
void addrvec_next(addrvec_t *avec, struct sockaddr_storage *next);
/**
+ * Retrieves the next entry from the addrvec but doesn't update the index.
+ */
+void addrvec_peek(addrvec_t *avec, struct sockaddr_storage *next);
+
+/**
* Compare two addrvecs for equality.
*
* \returns 1 if the contents of the two lists are identical and and 0 otherwise.
Modified: zookeeper/trunk/src/c/src/cli.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/cli.c?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/cli.c (original)
+++ zookeeper/trunk/src/c/src/cli.c Mon Jul 7 21:43:44 2014
@@ -76,6 +76,8 @@ static const char* state2String(int stat
return "ASSOCIATING_STATE";
if (state == ZOO_CONNECTED_STATE)
return "CONNECTED_STATE";
+ if (state == ZOO_READONLY_STATE)
+ return "READONLY_STATE";
if (state == ZOO_EXPIRED_SESSION_STATE)
return "EXPIRED_SESSION_STATE";
if (state == ZOO_AUTH_FAILED_STATE)
@@ -661,6 +663,7 @@ int main(int argc, char **argv) {
char appId[64];
#endif
int bufoff = 0;
+ int flags, i;
FILE *fh;
if (argc < 2) {
@@ -690,6 +693,15 @@ int main(int argc, char **argv) {
}
}
}
+
+ flags = 0;
+ for (i = 1; i < argc; ++i) {
+ if (strcmp("-r", argv[i]) == 0) {
+ flags = ZOO_READONLY;
+ break;
+ }
+ }
+
#ifdef YCA
strcpy(appId,"yahoo.example.yca_test");
cert = yca_get_cert_once(appId);
@@ -708,7 +720,7 @@ int main(int argc, char **argv) {
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
zoo_deterministic_conn_order(1); // enable deterministic order
hostPort = argv[1];
- zh = zookeeper_init(hostPort, watcher, 30000, &myid, 0, 0);
+ zh = zookeeper_init(hostPort, watcher, 30000, &myid, NULL, flags);
if (!zh) {
return errno;
}
Modified: zookeeper/trunk/src/c/src/load_gen.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/load_gen.c?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/load_gen.c (original)
+++ zookeeper/trunk/src/c/src/load_gen.c Mon Jul 7 21:43:44 2014
@@ -75,8 +75,8 @@ void waitCounter(){
}
void listener(zhandle_t *zzh, int type, int state, const char *path,void* ctx) {
- if(type == ZOO_SESSION_EVENT){
- if(state == ZOO_CONNECTED_STATE){
+ if (type == ZOO_SESSION_EVENT) {
+ if (state == ZOO_CONNECTED_STATE || state == ZOO_READONLY_STATE) {
pthread_mutex_lock(&lock);
pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&lock);
Modified: zookeeper/trunk/src/c/src/zk_adaptor.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/zk_adaptor.h?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/zk_adaptor.h (original)
+++ zookeeper/trunk/src/c/src/zk_adaptor.h Mon Jul 7 21:43:44 2014
@@ -42,6 +42,7 @@
#define CONNECTING_STATE_DEF 1
#define ASSOCIATING_STATE_DEF 2
#define CONNECTED_STATE_DEF 3
+#define READONLY_STATE_DEF 5
#define NOTCONNECTED_STATE_DEF 999
/* zookeeper event type constants */
@@ -131,7 +132,7 @@ typedef struct _buffer_list {
} buffer_list_t;
/* the size of connect request */
-#define HANDSHAKE_REQ_SIZE 44
+#define HANDSHAKE_REQ_SIZE 45
/* connect request */
struct connect_req {
int32_t protocolVersion;
@@ -140,6 +141,7 @@ struct connect_req {
int64_t sessionId;
int32_t passwd_len;
char passwd[16];
+ char readOnly;
};
/* the connect response */
@@ -150,6 +152,7 @@ struct prime_struct {
int64_t sessionId;
int32_t passwd_len;
char passwd[16];
+ char readOnly;
};
#ifdef THREADED
@@ -217,6 +220,10 @@ struct _zhandle {
completion_head_t completions_to_process; // completions that are ready to run
int outstanding_sync; // number of outstanding synchronous requests
+ /* read-only mode specific fields */
+ struct timeval last_ping_rw; /* The last time we checked server for being r/w */
+ int ping_rw_timeout; /* The time that can go by before checking next server */
+
// State info
volatile int state; // Current zookeeper state
void *context; // client-side provided context
@@ -228,7 +235,7 @@ struct _zhandle {
// Primer storage
struct _buffer_list primer_buffer; // The buffer used for the handshake at the start of a connection
struct prime_struct primer_storage; // the connect response
- char primer_storage_buffer[40]; // the true size of primer_storage
+ char primer_storage_buffer[41]; // the true size of primer_storage
/* zookeeper_close is not reentrant because it de-allocates the zhandler.
* This guard variable is used to defer the destruction of zhandle till
@@ -249,6 +256,11 @@ struct _zhandle {
/** used for chroot path at the client side **/
char *chroot;
+
+ /** Indicates if this client is allowed to go to r/o mode */
+ char allow_read_only;
+ /** Indicates if we connected to a majority server before */
+ char seen_rw_server_before;
};
Modified: zookeeper/trunk/src/c/src/zookeeper.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/zookeeper.c?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/zookeeper.c (original)
+++ zookeeper/trunk/src/c/src/zookeeper.c Mon Jul 7 21:43:44 2014
@@ -41,7 +41,7 @@
#include <stdarg.h>
#include <limits.h>
-#ifndef WIN32
+#ifndef _WIN32
#include <sys/time.h>
#include <sys/socket.h>
#include <poll.h>
@@ -76,6 +76,7 @@ const int ZOO_AUTH_FAILED_STATE = AUTH_F
const int ZOO_CONNECTING_STATE = CONNECTING_STATE_DEF;
const int ZOO_ASSOCIATING_STATE = ASSOCIATING_STATE_DEF;
const int ZOO_CONNECTED_STATE = CONNECTED_STATE_DEF;
+const int ZOO_READONLY_STATE = READONLY_STATE_DEF;
const int ZOO_NOTCONNECTED_STATE = NOTCONNECTED_STATE_DEF;
static __attribute__ ((unused)) const char* state2String(int state){
@@ -88,6 +89,8 @@ static __attribute__ ((unused)) const ch
return "ZOO_ASSOCIATING_STATE";
case CONNECTED_STATE_DEF:
return "ZOO_CONNECTED_STATE";
+ case READONLY_STATE_DEF:
+ return "ZOO_READONLY_STATE";
case EXPIRED_SESSION_STATE_DEF:
return "ZOO_EXPIRED_SESSION_STATE";
case AUTH_FAILED_STATE_DEF:
@@ -225,17 +228,28 @@ static __attribute__((unused)) void prin
static void *SYNCHRONOUS_MARKER = (void*)&SYNCHRONOUS_MARKER;
static int isValidPath(const char* path, const int flags);
-#ifdef _WINDOWS
-static int zookeeper_send(SOCKET s, const void* buf, int len)
+#ifdef _WIN32
+typedef SOCKET socket_t;
+typedef int sendsize_t;
+#define SEND_FLAGS 0
#else
-static ssize_t zookeeper_send(int s, const void* buf, size_t len)
+#ifdef __APPLE__
+#define MSG_NOSIGNAL SO_NOSIGPIPE
#endif
-{
-#ifdef __linux__
- return send(s, buf, len, MSG_NOSIGNAL);
-#else
- return send(s, buf, len, 0);
+typedef int socket_t;
+typedef ssize_t sendsize_t;
+#define SEND_FLAGS MSG_NOSIGNAL
#endif
+
+static void zookeeper_set_sock_nodelay(zhandle_t *, socket_t);
+static void zookeeper_set_sock_noblock(zhandle_t *, socket_t);
+static void zookeeper_set_sock_timeout(zhandle_t *, socket_t, int);
+static int zookeeper_connect(zhandle_t *, struct sockaddr_storage *, socket_t);
+
+
+static sendsize_t zookeeper_send(socket_t s, const void* buf, size_t len)
+{
+ return send(s, buf, len, SEND_FLAGS);
}
const void *zoo_get_context(zhandle_t *zh)
@@ -438,7 +452,7 @@ static void destroy(zhandle_t *zh)
static void setup_random()
{
-#ifndef WIN32 // TODO: better seed
+#ifndef _WIN32 // TODO: better seed
int seed;
int fd = open("/dev/urandom", O_RDONLY);
if (fd == -1) {
@@ -651,7 +665,7 @@ static int resolve_hosts(const zhandle_t
#endif
if (rc != 0) {
errno = getaddrinfo_errno(rc);
-#ifdef WIN32
+#ifdef _WIN32
LOG_ERROR(LOGCALLBACK(zh), "Win32 message: %s\n", gai_strerror(rc));
#elif __linux__ && __GNUC__
LOG_ERROR(LOGCALLBACK(zh), "getaddrinfo: %s\n", gai_strerror(rc));
@@ -990,7 +1004,7 @@ static zhandle_t *zookeeper_init_interna
zh->log_callback = log_callback;
log_env(zh);
-#ifdef WIN32
+#ifdef _WIN32
if (Win32WSAStartup()){
LOG_ERROR(LOGCALLBACK(zh), "Error initializing ws2_32.dll");
return 0;
@@ -1012,6 +1026,9 @@ static zhandle_t *zookeeper_init_interna
zh->state = ZOO_NOTCONNECTED_STATE;
zh->context = context;
zh->recv_timeout = recv_timeout;
+ zh->allow_read_only = flags & ZOO_READONLY;
+ // non-zero clientid implies we've seen r/w server already
+ zh->seen_rw_server_before = (clientid != 0 && clientid->client_id != 0);
init_auth_info(&zh->auth_h);
if (watcher) {
zh->watcher = watcher;
@@ -1054,6 +1071,7 @@ static zhandle_t *zookeeper_init_interna
if(update_addrs(zh) != 0) {
goto abort;
}
+
if (clientid) {
memcpy(&zh->client_id, clientid, sizeof(zh->client_id));
} else {
@@ -1382,11 +1400,7 @@ static __attribute__ ((unused)) int get_
* 0 if send would block while sending the buffer (or a send was incomplete),
* 1 if success
*/
-#ifdef WIN32
-static int send_buffer(SOCKET fd, buffer_list_t *buff)
-#else
-static int send_buffer(int fd, buffer_list_t *buff)
-#endif
+static int send_buffer(socket_t fd, buffer_list_t *buff)
{
int len = buff->len;
int off = buff->curr_offset;
@@ -1398,10 +1412,10 @@ static int send_buffer(int fd, buffer_li
char *b = (char*)&nlen;
rc = zookeeper_send(fd, b + off, sizeof(nlen) - off);
if (rc == -1) {
-#ifndef _WINDOWS
- if (errno != EAGAIN) {
-#else
+#ifdef _WIN32
if (WSAGetLastError() != WSAEWOULDBLOCK) {
+#else
+ if (errno != EAGAIN) {
#endif
return -1;
} else {
@@ -1417,10 +1431,10 @@ static int send_buffer(int fd, buffer_li
off -= sizeof(buff->len);
rc = zookeeper_send(fd, buff->buffer + off, len - off);
if (rc == -1) {
-#ifndef _WINDOWS
- if (errno != EAGAIN) {
-#else
+#ifdef _WIN32
if (WSAGetLastError() != WSAEWOULDBLOCK) {
+#else
+ if (errno != EAGAIN) {
#endif
return -1;
}
@@ -1436,29 +1450,23 @@ static int send_buffer(int fd, buffer_li
* 0 if recv would block,
* 1 if success
*/
-#ifdef WIN32
-static int recv_buffer(SOCKET fd, buffer_list_t *buff)
-#else
-static int recv_buffer(int fd, buffer_list_t *buff)
-#endif
+static int recv_buffer(zhandle_t *zh, buffer_list_t *buff)
{
int off = buff->curr_offset;
int rc = 0;
- //fprintf(LOGSTREAM, "rc = %d, off = %d, line %d\n", rc, off, __LINE__);
/* if buffer is less than 4, we are reading in the length */
if (off < 4) {
char *buffer = (char*)&(buff->len);
- rc = recv(fd, buffer+off, sizeof(int)-off, 0);
- //fprintf(LOGSTREAM, "rc = %d, off = %d, line %d\n", rc, off, __LINE__);
- switch(rc) {
+ rc = recv(zh->fd, buffer+off, sizeof(int)-off, 0);
+ switch (rc) {
case 0:
errno = EHOSTDOWN;
case -1:
-#ifndef _WINDOWS
- if (errno == EAGAIN) {
-#else
+#ifdef _WIN32
if (WSAGetLastError() == WSAEWOULDBLOCK) {
+#else
+ if (errno == EAGAIN) {
#endif
return 0;
}
@@ -1476,15 +1484,21 @@ static int recv_buffer(int fd, buffer_li
/* want off to now represent the offset into the buffer */
off -= sizeof(buff->len);
- rc = recv(fd, buff->buffer+off, buff->len-off, 0);
+ rc = recv(zh->fd, buff->buffer+off, buff->len-off, 0);
+
+ /* dirty hack to make new client work against old server
+ * old server sends 40 bytes to finish connection handshake,
+ * while we're expecting 41 (1 byte for read-only mode data) */
+ if (buff == &zh->primer_buffer && rc == buff->len - 1) ++rc;
+
switch(rc) {
case 0:
errno = EHOSTDOWN;
case -1:
-#ifndef _WINDOWS
- if (errno == EAGAIN) {
-#else
+#ifdef _WIN32
if (WSAGetLastError() == WSAEWOULDBLOCK) {
+#else
+ if (errno == EAGAIN) {
#endif
break;
}
@@ -1578,6 +1592,13 @@ static void cleanup_bufs(zhandle_t *zh,i
}
}
+/* return 1 if zh's state is ZOO_CONNECTED_STATE or ZOO_READONLY_STATE,
+ * 0 otherwise */
+static int is_connected(zhandle_t* zh)
+{
+ return (zh->state==ZOO_CONNECTED_STATE || zh->state==ZOO_READONLY_STATE);
+}
+
static void handle_error(zhandle_t *zh,int rc)
{
close(zh->fd);
@@ -1585,7 +1606,7 @@ static void handle_error(zhandle_t *zh,i
LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=%s",
state2String(zh->state));
PROCESS_SESSION_EVENT(zh, zh->state);
- } else if (zh->state == ZOO_CONNECTED_STATE) {
+ } else if (is_connected(zh)) {
LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=CONNECTING_STATE");
PROCESS_SESSION_EVENT(zh, ZOO_CONNECTING_STATE);
}
@@ -1793,36 +1814,46 @@ static int serialize_prime_connect(struc
offset = offset + sizeof(req->passwd_len);
memcpy(buffer + offset, req->passwd, sizeof(req->passwd));
+ offset = offset + sizeof(req->passwd);
+
+ memcpy(buffer + offset, &req->readOnly, sizeof(req->readOnly));
return 0;
}
- static int deserialize_prime_response(struct prime_struct *req, char* buffer){
+static int deserialize_prime_response(struct prime_struct *resp, char* buffer)
+{
//this should be the order of deserialization
int offset = 0;
- memcpy(&req->len, buffer + offset, sizeof(req->len));
- offset = offset + sizeof(req->len);
+ memcpy(&resp->len, buffer + offset, sizeof(resp->len));
+ offset = offset + sizeof(resp->len);
+
+ resp->len = ntohl(resp->len);
+ memcpy(&resp->protocolVersion,
+ buffer + offset,
+ sizeof(resp->protocolVersion));
+ offset = offset + sizeof(resp->protocolVersion);
+
+ resp->protocolVersion = ntohl(resp->protocolVersion);
+ memcpy(&resp->timeOut, buffer + offset, sizeof(resp->timeOut));
+ offset = offset + sizeof(resp->timeOut);
+
+ resp->timeOut = ntohl(resp->timeOut);
+ memcpy(&resp->sessionId, buffer + offset, sizeof(resp->sessionId));
+ offset = offset + sizeof(resp->sessionId);
+
+ resp->sessionId = htonll(resp->sessionId);
+ memcpy(&resp->passwd_len, buffer + offset, sizeof(resp->passwd_len));
+ offset = offset + sizeof(resp->passwd_len);
+
+ resp->passwd_len = ntohl(resp->passwd_len);
+ memcpy(resp->passwd, buffer + offset, sizeof(resp->passwd));
+ offset = offset + sizeof(resp->passwd);
- req->len = ntohl(req->len);
- memcpy(&req->protocolVersion, buffer + offset, sizeof(req->protocolVersion));
- offset = offset + sizeof(req->protocolVersion);
-
- req->protocolVersion = ntohl(req->protocolVersion);
- memcpy(&req->timeOut, buffer + offset, sizeof(req->timeOut));
- offset = offset + sizeof(req->timeOut);
-
- req->timeOut = ntohl(req->timeOut);
- memcpy(&req->sessionId, buffer + offset, sizeof(req->sessionId));
- offset = offset + sizeof(req->sessionId);
-
- req->sessionId = htonll(req->sessionId);
- memcpy(&req->passwd_len, buffer + offset, sizeof(req->passwd_len));
- offset = offset + sizeof(req->passwd_len);
+ memcpy(&resp->readOnly, buffer + offset, sizeof(resp->readOnly));
- req->passwd_len = ntohl(req->passwd_len);
- memcpy(req->passwd, buffer + offset, sizeof(req->passwd));
return 0;
- }
+}
static int prime_connection(zhandle_t *zh)
{
@@ -1833,11 +1864,12 @@ static int prime_connection(zhandle_t *z
int hlen = 0;
struct connect_req req;
req.protocolVersion = 0;
- req.sessionId = zh->client_id.client_id;
+ req.sessionId = zh->seen_rw_server_before ? zh->client_id.client_id : 0;
req.passwd_len = sizeof(req.passwd);
memcpy(req.passwd, zh->client_id.passwd, sizeof(zh->client_id.passwd));
req.timeOut = zh->recv_timeout;
req.lastZxidSeen = zh->last_zxid;
+ req.readOnly = zh->allow_read_only;
hlen = htonl(len);
/* We are running fast and loose here, but this string should fit in the initial buffer! */
rc=zookeeper_send(zh->fd, &hlen, sizeof(len));
@@ -1850,6 +1882,8 @@ static int prime_connection(zhandle_t *z
zh->state = ZOO_ASSOCIATING_STATE;
zh->input_buffer = &zh->primer_buffer;
+ memset(zh->input_buffer->buffer, 0, zh->input_buffer->len);
+
/* This seems a bit weird to to set the offset to 4, but we already have a
* length, so we skip reading the length (and allocating the buffer) by
* saying that we are already at offset 4 */
@@ -1905,16 +1939,128 @@ static struct timeval get_timeval(int in
return rc<0 ? rc : adaptor_send_queue(zh, 0);
}
-#ifdef WIN32
-int zookeeper_interest(zhandle_t *zh, SOCKET *fd, int *interest,
- struct timeval *tv)
+/* upper bound of a timeout for seeking for r/w server when in read-only mode */
+const int MAX_RW_TIMEOUT = 60000;
+const int MIN_RW_TIMEOUT = 200;
+
+static int ping_rw_server(zhandle_t* zh)
{
+ char buf[10];
+ socket_t sock;
+ int rc;
+ sendsize_t ssize;
+ struct sockaddr_storage addr;
+
+ addrvec_peek(&zh->addrs, &addr);
+
+ sock = socket(addr.ss_family, SOCK_STREAM, 0);
+ if (sock < 0) {
+ return 0;
+ }
+
+ zookeeper_set_sock_nodelay(zh, sock);
+ zookeeper_set_sock_timeout(zh, sock, 1);
+
+ rc = zookeeper_connect(zh, &addr, sock);
+ if (rc < 0) {
+ return 0;
+ }
+
+ ssize = zookeeper_send(sock, "isro", 4);
+ if (ssize < 0) {
+ rc = 0;
+ goto out;
+ }
+
+ memset(buf, 0, sizeof(buf));
+ rc = recv(sock, buf, sizeof(buf), 0);
+ if (rc < 0) {
+ rc = 0;
+ goto out;
+ }
+
+ rc = strcmp("rw", buf) == 0;
+
+out:
+ close(sock);
+ return rc;
+}
+
+static inline int min(int a, int b)
+{
+ return a < b ? a : b;
+}
+
+static void zookeeper_set_sock_noblock(zhandle_t *zh, socket_t sock)
+{
+#ifdef _WIN32
ULONG nonblocking_flag = 1;
+
+ ioctlsocket(sock, FIONBIO, &nonblocking_flag);
#else
-int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
- struct timeval *tv)
+ fcntl(sock, F_SETFL, O_NONBLOCK|fcntl(sock, F_GETFL, 0));
+#endif
+}
+
+static void zookeeper_set_sock_timeout(zhandle_t *zh, socket_t s, int timeout)
{
+ struct timeval tv = { .tv_sec = timeout };
+
+ setsockopt(s, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(struct timeval));
+ setsockopt(s, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval));
+}
+
+static void zookeeper_set_sock_nodelay(zhandle_t *zh, socket_t sock)
+{
+#ifdef _WIN32
+ char enable_tcp_nodelay = 1;
+#else
+ int enable_tcp_nodelay = 1;
+#endif
+ int rc;
+
+ rc = setsockopt(sock,
+ IPPROTO_TCP,
+ TCP_NODELAY,
+ &enable_tcp_nodelay,
+ sizeof(enable_tcp_nodelay));
+
+ if (rc) {
+ LOG_WARN(LOGCALLBACK(zh),
+ "Unable to set TCP_NODELAY, latency may be effected");
+ }
+}
+
+static socket_t zookeeper_connect(zhandle_t *zh,
+ struct sockaddr_storage *addr,
+ socket_t fd)
+{
+ int rc;
+ int addr_len;
+
+#if defined(AF_INET6)
+ if (addr->ss_family == AF_INET6) {
+ addr_len = sizeof(struct sockaddr_in6);
+ } else {
+ addr_len = sizeof(struct sockaddr_in);
+ }
+#else
+ addr_len = sizeof(struct sockaddr_in);
#endif
+
+ LOG_DEBUG(LOGCALLBACK(zh), "[zk] connect()\n");
+ rc = connect(fd, (struct sockaddr *)addr, addr_len);
+
+#ifdef _WIN32
+ get_errno();
+#endif
+
+ return rc;
+}
+
+int zookeeper_interest(zhandle_t *zh, socket_t *fd, int *interest,
+ struct timeval *tv)
+{
int rc = 0;
struct timeval now;
if(zh==0 || fd==0 ||interest==0 || tv==0)
@@ -1942,7 +2088,6 @@ int zookeeper_interest(zhandle_t *zh, in
tv->tv_usec = 0;
if (*fd == -1) {
-
/*
* If we previously failed to connect to server pool (zh->delay == 1)
* then we need delay our connection on this iteration 1/60 of the
@@ -1957,63 +2102,45 @@ int zookeeper_interest(zhandle_t *zh, in
LOG_WARN(LOGCALLBACK(zh), "Delaying connection after exhaustively trying all servers [%s]",
zh->hostname);
- }
-
- // No need to delay -- grab the next server and attempt connection
- else {
- int ssoresult;
-
-#ifdef WIN32
- char enable_tcp_nodelay = 1;
-#else
- int enable_tcp_nodelay = 1;
-#endif
+ } else {
+ // No need to delay -- grab the next server and attempt connection
zoo_cycle_next_server(zh);
-
zh->fd = socket(zh->addr_cur.ss_family, SOCK_STREAM, 0);
if (zh->fd < 0) {
- return api_epilog(zh,handle_socket_error_msg(zh,__LINE__,
- ZSYSTEMERROR, "socket() call failed"));
- }
- ssoresult = setsockopt(zh->fd, IPPROTO_TCP, TCP_NODELAY, &enable_tcp_nodelay, sizeof(enable_tcp_nodelay));
- if (ssoresult != 0) {
- LOG_WARN(LOGCALLBACK(zh), "Unable to set TCP_NODELAY, operation latency may be effected");
+ rc = handle_socket_error_msg(zh,
+ __LINE__,
+ ZSYSTEMERROR,
+ "socket() call failed");
+ return api_epilog(zh, rc);
}
-#ifdef WIN32
- ioctlsocket(zh->fd, FIONBIO, &nonblocking_flag);
-#else
- fcntl(zh->fd, F_SETFL, O_NONBLOCK|fcntl(zh->fd, F_GETFL, 0));
-#endif
-#if defined(AF_INET6)
- if (zh->addr_cur.ss_family == AF_INET6) {
- rc = connect(zh->fd, (struct sockaddr*)&zh->addr_cur, sizeof(struct sockaddr_in6));
- } else {
-#else
- LOG_DEBUG(LOGCALLBACK(zh), "[zk] connect()\n");
- {
-#endif
- rc = connect(zh->fd, (struct sockaddr*)&zh->addr_cur, sizeof(struct sockaddr_in));
-#ifdef WIN32
- get_errno();
-#endif
- }
- if (rc == -1) {
+ zookeeper_set_sock_nodelay(zh, zh->fd);
+ zookeeper_set_sock_noblock(zh, zh->fd);
+
+ rc = zookeeper_connect(zh, &zh->addr_cur, zh->fd);
+
+ if (rc == -1) {
/* we are handling the non-blocking connect according to
* the description in section 16.3 "Non-blocking connect"
* in UNIX Network Programming vol 1, 3rd edition */
- if (errno == EWOULDBLOCK || errno == EINPROGRESS)
+ if (errno == EWOULDBLOCK || errno == EINPROGRESS) {
zh->state = ZOO_CONNECTING_STATE;
- else
- {
- return api_epilog(zh,handle_socket_error_msg(zh,__LINE__,
- ZCONNECTIONLOSS,"connect() call failed"));
+ } else {
+ rc = handle_socket_error_msg(zh,
+ __LINE__,
+ ZCONNECTIONLOSS,
+ "connect() call failed");
+ return api_epilog(zh, rc);
}
} else {
- if((rc=prime_connection(zh))!=0)
+ rc = prime_connection(zh);
+ if (rc != 0) {
return api_epilog(zh,rc);
+ }
- LOG_INFO(LOGCALLBACK(zh), "Initiated connection to server [%s]", format_endpoint_info(&zh->addr_cur));
+ LOG_INFO(LOGCALLBACK(zh),
+ "Initiated connection to server [%s]",
+ format_endpoint_info(&zh->addr_cur));
}
*tv = get_timeval(zh->recv_timeout/3);
}
@@ -2021,6 +2148,8 @@ int zookeeper_interest(zhandle_t *zh, in
zh->last_recv = now;
zh->last_send = now;
zh->last_ping = now;
+ zh->last_ping_rw = now;
+ zh->ping_rw_timeout = MIN_RW_TIMEOUT;
}
if (zh->fd != -1) {
@@ -2031,7 +2160,7 @@ int zookeeper_interest(zhandle_t *zh, in
// have we exceeded the receive timeout threshold?
if (recv_to <= 0) {
// We gotta cut our losses and connect to someone else
-#ifdef WIN32
+#ifdef _WIN32
errno = WSAETIMEDOUT;
#else
errno = ETIMEDOUT;
@@ -2045,14 +2174,13 @@ int zookeeper_interest(zhandle_t *zh, in
-recv_to));
}
+
// We only allow 1/3 of our timeout time to expire before sending
// a PING
- if (zh->state==ZOO_CONNECTED_STATE) {
+ if (is_connected(zh)) {
send_to = zh->recv_timeout/3 - idle_send;
if (send_to <= 0) {
if (zh->sent_requests.head == 0) {
-// LOG_DEBUG(LOGCALLBACK(zh), "Sending PING to %s (exceeded idle by %dms)",
-// zoo_get_current_server(zh),-send_to);
rc = send_ping(zh);
if (rc < 0) {
LOG_ERROR(LOGCALLBACK(zh), "failed to send PING request (zk retcode=%d)",rc);
@@ -2062,8 +2190,33 @@ int zookeeper_interest(zhandle_t *zh, in
send_to = zh->recv_timeout/3;
}
}
+
+ // If we are in read-only mode, seek for read/write server
+ if (zh->state == ZOO_READONLY_STATE) {
+ int idle_ping_rw = calculate_interval(&zh->last_ping_rw, &now);
+ if (idle_ping_rw >= zh->ping_rw_timeout) {
+ zh->last_ping_rw = now;
+ idle_ping_rw = 0;
+ zh->ping_rw_timeout = min(zh->ping_rw_timeout * 2,
+ MAX_RW_TIMEOUT);
+ if (ping_rw_server(zh)) {
+ struct sockaddr_storage addr;
+ addrvec_peek(&zh->addrs, &addr);
+ zh->ping_rw_timeout = MIN_RW_TIMEOUT;
+ LOG_INFO(LOGCALLBACK(zh),
+ "r/w server found at %s",
+ format_endpoint_info(&addr));
+ handle_error(zh, ZRWSERVERFOUND);
+ } else {
+ addrvec_next(&zh->addrs, NULL);
+ }
+ }
+ send_to = min(send_to, zh->ping_rw_timeout - idle_ping_rw);
+ }
+
// choose the lesser value as the timeout
- *tv = get_timeval(recv_to < send_to? recv_to:send_to);
+ *tv = get_timeval(min(recv_to, send_to));
+
zh->next_deadline.tv_sec = now.tv_sec + tv->tv_sec;
zh->next_deadline.tv_usec = now.tv_usec + tv->tv_usec;
if (zh->next_deadline.tv_usec > 1000000) {
@@ -2073,7 +2226,7 @@ int zookeeper_interest(zhandle_t *zh, in
*interest = ZOOKEEPER_READ;
/* we are interested in a write if we are connected and have something
* to send, or we are waiting for a connect to finish. */
- if ((zh->to_send.head && (zh->state == ZOO_CONNECTED_STATE))
+ if ((zh->to_send.head && is_connected(zh))
|| zh->state == ZOO_CONNECTING_STATE) {
*interest |= ZOOKEEPER_WRITE;
}
@@ -2118,7 +2271,7 @@ static int check_events(zhandle_t *zh, i
zh->input_buffer = allocate_buffer(0,0);
}
- rc = recv_buffer(zh->fd, zh->input_buffer);
+ rc = recv_buffer(zh, zh->input_buffer);
if (rc < 0) {
return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
"failed while receiving a server response");
@@ -2128,12 +2281,13 @@ static int check_events(zhandle_t *zh, i
if (zh->input_buffer != &zh->primer_buffer) {
queue_buffer(&zh->to_process, zh->input_buffer, 0);
} else {
- int64_t oldid,newid;
+ int64_t oldid, newid;
//deserialize
deserialize_prime_response(&zh->primer_storage, zh->primer_buffer.buffer);
/* We are processing the primer_buffer, so we need to finish
* the connection handshake */
- oldid = zh->client_id.client_id;
+ oldid = zh->seen_rw_server_before ? zh->client_id.client_id : 0;
+ zh->seen_rw_server_before |= !zh->primer_storage.readOnly;
newid = zh->primer_storage.sessionId;
if (oldid != 0 && oldid != newid) {
zh->state = ZOO_EXPIRED_SESSION_STATE;
@@ -2146,11 +2300,14 @@ static int check_events(zhandle_t *zh, i
memcpy(zh->client_id.passwd, &zh->primer_storage.passwd,
sizeof(zh->client_id.passwd));
- zh->state = ZOO_CONNECTED_STATE;
+ zh->state = zh->primer_storage.readOnly ?
+ ZOO_READONLY_STATE : ZOO_CONNECTED_STATE;
zh->reconfig = 0;
- LOG_INFO(LOGCALLBACK(zh), "session establishment complete on server [%s], sessionId=%#llx, negotiated timeout=%d",
- format_endpoint_info(&zh->addr_cur),
- newid, zh->recv_timeout);
+ LOG_INFO(LOGCALLBACK(zh),
+ "session establishment complete on server [%s], sessionId=%#llx, negotiated timeout=%d %s",
+ format_endpoint_info(&zh->addr_cur),
+ newid, zh->recv_timeout,
+ zh->primer_storage.readOnly ? "(READ-ONLY mode)" : "");
/* we want the auth to be sent for, but since both call push to front
we need to call send_watch_set first */
send_set_watches(zh);
@@ -2158,7 +2315,7 @@ static int check_events(zhandle_t *zh, i
send_auth_info(zh);
LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE");
zh->input_buffer = 0; // just in case the watcher calls zookeeper_process() again
- PROCESS_SESSION_EVENT(zh, ZOO_CONNECTED_STATE);
+ PROCESS_SESSION_EVENT(zh, zh->state);
}
}
zh->input_buffer = 0;
@@ -2555,7 +2712,7 @@ void process_completions(zhandle_t *zh)
static void isSocketReadable(zhandle_t* zh)
{
-#ifndef WIN32
+#ifndef _WIN32
struct pollfd fds;
fds.fd = zh->fd;
fds.events = POLLIN;
@@ -2724,7 +2881,9 @@ int zookeeper_process(zhandle_t *zh, int
if (process_async(zh->outstanding_sync)) {
process_completions(zh);
}
- return api_epilog(zh,ZOK);}
+
+ return api_epilog(zh, ZOK);
+}
int zoo_state(zhandle_t *zh)
{
@@ -3000,7 +3159,7 @@ int zookeeper_close(zhandle_t *zh)
}
/* No need to decrement the counter since we're just going to
* destroy the handle later. */
- if(zh->state==ZOO_CONNECTED_STATE){
+ if (is_connected(zh)){
struct oarchive *oa;
struct RequestHeader h = {get_xid(), ZOO_CLOSE_OP};
LOG_INFO(LOGCALLBACK(zh), "Closing zookeeper sessionId=%#llx to [%s]\n",
@@ -3029,7 +3188,7 @@ finish:
destroy(zh);
adaptor_destroy(zh);
free(zh);
-#ifdef WIN32
+#ifdef _WIN32
Win32WSACleanup();
#endif
return rc;
@@ -3905,7 +4064,7 @@ int flush_send_queue(zhandle_t*zh, int t
{
int rc= ZOK;
struct timeval started;
-#ifdef WIN32
+#ifdef _WIN32
fd_set pollSet;
struct timeval wait;
#endif
@@ -3915,9 +4074,9 @@ int flush_send_queue(zhandle_t*zh, int t
// we use a recursive lock instead and only dequeue the buffer if a send was
// successful
lock_buffer_list(&zh->to_send);
- while (zh->to_send.head != 0&& zh->state == ZOO_CONNECTED_STATE) {
+ while (zh->to_send.head != 0 && is_connected(zh)) {
if(timeout!=0){
-#ifndef WIN32
+#ifndef _WIN32
struct pollfd fds;
#endif
int elapsed;
@@ -3929,7 +4088,7 @@ int flush_send_queue(zhandle_t*zh, int t
break;
}
-#ifdef WIN32
+#ifdef _WIN32
wait = get_timeval(timeout-elapsed);
FD_ZERO(&pollSet);
FD_SET(zh->fd, &pollSet);
@@ -4019,6 +4178,8 @@ const char* zerror(int c)
return "(not error) no server responses to process";
case ZSESSIONMOVED:
return "session moved to another server, so operation is ignored";
+ case ZNOTREADONLY:
+ return "state-changing request is passed to read-only server";
case ZNEWCONFIGNOQUORUM:
return "no quorum of new config is connected and up-to-date with the leader of last commmitted config - try invoking reconfiguration after new servers are connected and synced";
case ZRECONFIGINPROGRESS:
@@ -4069,7 +4230,7 @@ int zoo_add_auth(zhandle_t *zh,const cha
add_last_auth(&zh->auth_h, authinfo);
zoo_unlock_auth(zh);
- if(zh->state == ZOO_CONNECTED_STATE || zh->state == ZOO_ASSOCIATING_STATE)
+ if (is_connected(zh) || zh->state == ZOO_ASSOCIATING_STATE)
return send_last_auth_info(zh);
return ZOK;
@@ -4080,7 +4241,7 @@ static const char* format_endpoint_info(
static char buf[128] = { 0 };
char addrstr[128] = { 0 };
void *inaddr;
-#ifdef WIN32
+#ifdef _WIN32
char * addrstring;
#endif
int port;
@@ -4098,7 +4259,7 @@ static const char* format_endpoint_info(
#if defined(AF_INET6)
}
#endif
-#ifdef WIN32
+#ifdef _WIN32
addrstring = inet_ntoa (*(struct in_addr*)inaddr);
sprintf(buf,"%s:%d",addrstring,ntohs(port));
#else
Modified: zookeeper/trunk/src/c/tests/TestClientRetry.cc
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/TestClientRetry.cc?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/tests/TestClientRetry.cc (original)
+++ zookeeper/trunk/src/c/tests/TestClientRetry.cc Mon Jul 7 21:43:44 2014
@@ -22,140 +22,14 @@
#include <signal.h>
#include <stdlib.h>
#include <unistd.h>
-#include <sys/select.h>
-
-#include "CollectionUtil.h"
-#include "ThreadingUtil.h"
-
-using namespace Util;
#include "Vector.h"
using namespace std;
-#include <cstring>
-#include <list>
-
#include <zookeeper.h>
#include "Util.h"
-
-#ifdef THREADED
- static void yield(zhandle_t *zh, int i)
- {
- sleep(i);
- }
-#else
- static void yield(zhandle_t *zh, int seconds)
- {
- int fd;
- int interest;
- int events;
- struct timeval tv;
- int rc;
- time_t expires = time(0) + seconds;
- time_t timeLeft = seconds;
- fd_set rfds, wfds, efds;
- FD_ZERO(&rfds);
- FD_ZERO(&wfds);
- FD_ZERO(&efds);
-
- while(timeLeft >= 0) {
- zookeeper_interest(zh, &fd, &interest, &tv);
- if (fd != -1) {
- if (interest&ZOOKEEPER_READ) {
- FD_SET(fd, &rfds);
- } else {
- FD_CLR(fd, &rfds);
- }
- if (interest&ZOOKEEPER_WRITE) {
- FD_SET(fd, &wfds);
- } else {
- FD_CLR(fd, &wfds);
- }
- } else {
- fd = 0;
- }
- FD_SET(0, &rfds);
- if (tv.tv_sec > timeLeft) {
- tv.tv_sec = timeLeft;
- }
- rc = select(fd+1, &rfds, &wfds, &efds, &tv);
- timeLeft = expires - time(0);
- events = 0;
- if (FD_ISSET(fd, &rfds)) {
- events |= ZOOKEEPER_READ;
- }
- if (FD_ISSET(fd, &wfds)) {
- events |= ZOOKEEPER_WRITE;
- }
- zookeeper_process(zh, events);
- }
- }
-#endif
-
-typedef struct evt {
- string path;
- int type;
-} evt_t;
-
-typedef struct watchCtx {
-private:
- list<evt_t> events;
-public:
- bool connected;
- zhandle_t *zh;
- Mutex mutex;
-
- watchCtx() {
- connected = false;
- zh = 0;
- }
- ~watchCtx() {
- if (zh) {
- zookeeper_close(zh);
- zh = 0;
- }
- }
-
- evt_t getEvent() {
- evt_t evt;
- mutex.acquire();
- CPPUNIT_ASSERT( events.size() > 0);
- evt = events.front();
- events.pop_front();
- mutex.release();
- return evt;
- }
-
- int countEvents() {
- int count;
- mutex.acquire();
- count = events.size();
- mutex.release();
- return count;
- }
-
- void putEvent(evt_t evt) {
- mutex.acquire();
- events.push_back(evt);
- mutex.release();
- }
-
- bool waitForConnected(zhandle_t *zh) {
- time_t expires = time(0) + 10;
- while(!connected && time(0) < expires) {
- yield(zh, 1);
- }
- return connected;
- }
- bool waitForDisconnected(zhandle_t *zh) {
- time_t expires = time(0) + 15;
- while(connected && time(0) < expires) {
- yield(zh, 1);
- }
- return !connected;
- }
-} watchctx_t;
+#include "WatchUtil.h"
class Zookeeper_clientretry : public CPPUNIT_NS::TestFixture
{
Added: zookeeper/trunk/src/c/tests/TestReadOnlyClient.cc
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/TestReadOnlyClient.cc?rev=1608620&view=auto
==============================================================================
--- zookeeper/trunk/src/c/tests/TestReadOnlyClient.cc (added)
+++ zookeeper/trunk/src/c/tests/TestReadOnlyClient.cc Mon Jul 7 21:43:44 2014
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cppunit/extensions/HelperMacros.h>
+#include "CppAssertHelper.h"
+
+#include <sys/socket.h>
+#include <unistd.h>
+
+#include <zookeeper.h>
+
+#include "Util.h"
+#include "WatchUtil.h"
+
+class Zookeeper_readOnly : public CPPUNIT_NS::TestFixture {
+ CPPUNIT_TEST_SUITE(Zookeeper_readOnly);
+#ifdef THREADED
+ CPPUNIT_TEST(testReadOnly);
+#endif
+ CPPUNIT_TEST_SUITE_END();
+
+ static void watcher(zhandle_t* zh, int type, int state,
+ const char* path, void* v) {
+ watchctx_t *ctx = (watchctx_t*)v;
+
+ if (state==ZOO_CONNECTED_STATE || state==ZOO_READONLY_STATE) {
+ ctx->connected = true;
+ } else {
+ ctx->connected = false;
+ }
+ if (type != ZOO_SESSION_EVENT) {
+ evt_t evt;
+ evt.path = path;
+ evt.type = type;
+ ctx->putEvent(evt);
+ }
+ }
+
+ FILE *logfile;
+public:
+
+ Zookeeper_readOnly() {
+ logfile = openlogfile("Zookeeper_readOnly");
+ }
+
+ ~Zookeeper_readOnly() {
+ if (logfile) {
+ fflush(logfile);
+ fclose(logfile);
+ logfile = 0;
+ }
+ }
+
+ void setUp() {
+ zoo_set_log_stream(logfile);
+ }
+
+ void startReadOnly() {
+ char cmd[1024];
+ sprintf(cmd, "%s startReadOnly", ZKSERVER_CMD);
+ CPPUNIT_ASSERT(system(cmd) == 0);
+ }
+
+ void stopPeer() {
+ char cmd[1024];
+ sprintf(cmd, "%s stop", ZKSERVER_CMD);
+ CPPUNIT_ASSERT(system(cmd) == 0);
+ }
+
+ void testReadOnly() {
+ startReadOnly();
+ watchctx_t watch;
+ zhandle_t* zh = zookeeper_init("localhost:22181",
+ watcher,
+ 10000,
+ NULL,
+ &watch,
+ ZOO_READONLY);
+ watch.zh = zh;
+ CPPUNIT_ASSERT(zh != 0);
+ sleep(1);
+ int len = 1024;
+ char buf[len];
+ int res = zoo_get(zh, "/", 0, buf, &len, 0);
+ CPPUNIT_ASSERT_EQUAL((int)ZOK, res);
+
+ char path[1024];
+ res = zoo_create(zh, "/test", buf, 10, &ZOO_OPEN_ACL_UNSAFE, 0, path,
+ 512);
+ CPPUNIT_ASSERT_EQUAL((int)ZNOTREADONLY, res);
+ stopPeer();
+ }
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_readOnly);
Added: zookeeper/trunk/src/c/tests/WatchUtil.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/WatchUtil.h?rev=1608620&view=auto
==============================================================================
--- zookeeper/trunk/src/c/tests/WatchUtil.h (added)
+++ zookeeper/trunk/src/c/tests/WatchUtil.h Mon Jul 7 21:43:44 2014
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef WATCH_UTIL_H_
+#define WATCH_UTIL_H_
+
+#include <sys/select.h>
+#include <cstring>
+#include <list>
+
+using namespace std;
+
+#include "CollectionUtil.h"
+#include "ThreadingUtil.h"
+
+using namespace Util;
+
+#ifdef THREADED
+ static void yield(zhandle_t *zh, int i)
+ {
+ sleep(i);
+ }
+#else
+ static void yield(zhandle_t *zh, int seconds)
+ {
+ int fd;
+ int interest;
+ int events;
+ struct timeval tv;
+ int rc;
+ time_t expires = time(0) + seconds;
+ time_t timeLeft = seconds;
+ fd_set rfds, wfds, efds;
+ FD_ZERO(&rfds);
+ FD_ZERO(&wfds);
+ FD_ZERO(&efds);
+
+ while(timeLeft >= 0) {
+ zookeeper_interest(zh, &fd, &interest, &tv);
+ if (fd != -1) {
+ if (interest&ZOOKEEPER_READ) {
+ FD_SET(fd, &rfds);
+ } else {
+ FD_CLR(fd, &rfds);
+ }
+ if (interest&ZOOKEEPER_WRITE) {
+ FD_SET(fd, &wfds);
+ } else {
+ FD_CLR(fd, &wfds);
+ }
+ } else {
+ fd = 0;
+ }
+ FD_SET(0, &rfds);
+ if (tv.tv_sec > timeLeft) {
+ tv.tv_sec = timeLeft;
+ }
+ rc = select(fd+1, &rfds, &wfds, &efds, &tv);
+ timeLeft = expires - time(0);
+ events = 0;
+ if (FD_ISSET(fd, &rfds)) {
+ events |= ZOOKEEPER_READ;
+ }
+ if (FD_ISSET(fd, &wfds)) {
+ events |= ZOOKEEPER_WRITE;
+ }
+ zookeeper_process(zh, events);
+ }
+ }
+#endif
+
+typedef struct evt {
+ string path;
+ int type;
+} evt_t;
+
+typedef struct watchCtx {
+private:
+ list<evt_t> events;
+ watchCtx(const watchCtx&);
+ watchCtx& operator=(const watchCtx&);
+public:
+ bool connected;
+ zhandle_t *zh;
+ Mutex mutex;
+
+ watchCtx() {
+ connected = false;
+ zh = 0;
+ }
+ ~watchCtx() {
+ if (zh) {
+ zookeeper_close(zh);
+ zh = 0;
+ }
+ }
+
+ evt_t getEvent() {
+ evt_t evt;
+ mutex.acquire();
+ CPPUNIT_ASSERT( events.size() > 0);
+ evt = events.front();
+ events.pop_front();
+ mutex.release();
+ return evt;
+ }
+
+ int countEvents() {
+ int count;
+ mutex.acquire();
+ count = events.size();
+ mutex.release();
+ return count;
+ }
+
+ void putEvent(evt_t evt) {
+ mutex.acquire();
+ events.push_back(evt);
+ mutex.release();
+ }
+
+ bool waitForConnected(zhandle_t *zh) {
+ time_t expires = time(0) + 10;
+ while(!connected && time(0) < expires) {
+ yield(zh, 1);
+ }
+ return connected;
+ }
+ bool waitForDisconnected(zhandle_t *zh) {
+ time_t expires = time(0) + 15;
+ while(connected && time(0) < expires) {
+ yield(zh, 1);
+ }
+ return !connected;
+ }
+} watchctx_t;
+
+#endif /*WATCH_UTIL_H_*/
Modified: zookeeper/trunk/src/c/tests/ZKMocks.cc
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/ZKMocks.cc?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/tests/ZKMocks.cc (original)
+++ zookeeper/trunk/src/c/tests/ZKMocks.cc Mon Jul 7 21:43:44 2014
@@ -32,101 +32,117 @@ using namespace std;
TestClientId testClientId;
const char* TestClientId::PASSWD="1234567890123456";
-HandshakeRequest* HandshakeRequest::parse(const std::string& buf){
+HandshakeRequest* HandshakeRequest::parse(const std::string& buf) {
auto_ptr<HandshakeRequest> req(new HandshakeRequest);
memcpy(&req->protocolVersion,buf.data(), sizeof(req->protocolVersion));
req->protocolVersion = htonl(req->protocolVersion);
-
+
int offset=sizeof(req->protocolVersion);
-
+
memcpy(&req->lastZxidSeen,buf.data()+offset,sizeof(req->lastZxidSeen));
req->lastZxidSeen = htonll(req->lastZxidSeen);
offset+=sizeof(req->lastZxidSeen);
-
+
memcpy(&req->timeOut,buf.data()+offset,sizeof(req->timeOut));
req->timeOut = htonl(req->timeOut);
offset+=sizeof(req->timeOut);
-
+
memcpy(&req->sessionId,buf.data()+offset,sizeof(req->sessionId));
req->sessionId = htonll(req->sessionId);
offset+=sizeof(req->sessionId);
-
+
memcpy(&req->passwd_len,buf.data()+offset,sizeof(req->passwd_len));
req->passwd_len = htonl(req->passwd_len);
offset+=sizeof(req->passwd_len);
-
+
memcpy(req->passwd,buf.data()+offset,sizeof(req->passwd));
+ offset+=sizeof(req->passwd);
+
+ memcpy(&req->readOnly,buf.data()+offset,sizeof(req->readOnly));
+
if(testClientId.client_id==req->sessionId &&
!memcmp(testClientId.passwd,req->passwd,sizeof(req->passwd)))
return req.release();
// the request didn't match -- may not be a handshake request after all
+
return 0;
}
// *****************************************************************************
// watcher action implementation
-void activeWatcher(zhandle_t *zh, int type, int state, const char *path,void* ctx){
- if(zh==0 || ctx==0) return;
- WatcherAction* action=(WatcherAction*)ctx;
-
- if(type==ZOO_SESSION_EVENT){
- if(state==ZOO_EXPIRED_SESSION_STATE)
+void activeWatcher(zhandle_t *zh,
+ int type, int state, const char *path,void* ctx) {
+
+ if (zh == 0 || ctx == 0)
+ return;
+
+ WatcherAction* action = (WatcherAction *)ctx;
+
+ if (type == ZOO_SESSION_EVENT) {
+ if (state == ZOO_EXPIRED_SESSION_STATE)
action->onSessionExpired(zh);
- else if(state==ZOO_CONNECTING_STATE)
+ else if(state == ZOO_CONNECTING_STATE)
action->onConnectionLost(zh);
- else if(state==ZOO_CONNECTED_STATE)
+ else if(state == ZOO_CONNECTED_STATE)
action->onConnectionEstablished(zh);
- }else if(type==ZOO_CHANGED_EVENT)
+ } else if (type == ZOO_CHANGED_EVENT)
action->onNodeValueChanged(zh,path);
- else if(type==ZOO_DELETED_EVENT)
+ else if (type == ZOO_DELETED_EVENT)
action->onNodeDeleted(zh,path);
- else if(type==ZOO_CHILD_EVENT)
+ else if (type == ZOO_CHILD_EVENT)
action->onChildChanged(zh,path);
+
// TODO: implement for the rest of the event types
- // ...
- action->setWatcherTriggered();
+
+ action->setWatcherTriggered();
}
-SyncedBoolCondition WatcherAction::isWatcherTriggered() const{
+
+SyncedBoolCondition WatcherAction::isWatcherTriggered() const {
return SyncedBoolCondition(triggered_,mx_);
}
-// *****************************************************************************
// a set of async completion signatures
+
void asyncCompletion(int rc, ACL_vector *acl,Stat *stat, const void *data){
assert("Completion data is NULL"&&data);
static_cast<AsyncCompletion*>((void*)data)->aclCompl(rc,acl,stat);
}
-void asyncCompletion(int rc, const char *value, int len, const Stat *stat,
- const void *data){
+
+void asyncCompletion(int rc, const char *value, int len, const Stat *stat,
+ const void *data) {
assert("Completion data is NULL"&&data);
- static_cast<AsyncCompletion*>((void*)data)->dataCompl(rc,value,len,stat);
+ static_cast<AsyncCompletion*>((void*)data)->dataCompl(rc,value,len,stat);
}
-void asyncCompletion(int rc, const Stat *stat, const void *data){
+
+void asyncCompletion(int rc, const Stat *stat, const void *data) {
assert("Completion data is NULL"&&data);
static_cast<AsyncCompletion*>((void*)data)->statCompl(rc,stat);
}
-void asyncCompletion(int rc, const char *value, const void *data){
+
+void asyncCompletion(int rc, const char *value, const void *data) {
assert("Completion data is NULL"&&data);
static_cast<AsyncCompletion*>((void*)data)->stringCompl(rc,value);
}
-void asyncCompletion(int rc,const String_vector *strings, const void *data){
+
+void asyncCompletion(int rc,const String_vector *strings, const void *data) {
assert("Completion data is NULL"&&data);
static_cast<AsyncCompletion*>((void*)data)->stringsCompl(rc,strings);
}
-void asyncCompletion(int rc, const void *data){
+
+void asyncCompletion(int rc, const void *data) {
assert("Completion data is NULL"&&data);
static_cast<AsyncCompletion*>((void*)data)->voidCompl(rc);
}
-// *****************************************************************************
// a predicate implementation
bool IOThreadStopped::operator()() const{
#ifdef THREADED
adaptor_threads* adaptor=(adaptor_threads*)zh_->adaptor_priv;
return CheckedPthread::isTerminated(adaptor->io);
#else
- assert("IOThreadStopped predicate is only for use with THREADED client"&& false);
+ assert("IOThreadStopped predicate is only for use with THREADED client" &&
+ false);
return false;
#endif
}
@@ -169,7 +185,7 @@ Mock_activateWatcher* Mock_activateWatch
class ActivateWatcherWrapper: public Mock_activateWatcher{
public:
ActivateWatcherWrapper():ctx_(0),activated_(false){}
-
+
virtual void call(zhandle_t *zh, watcher_registration_t* reg, int rc){
CALL_REAL(activateWatcher,(zh, reg,rc));
synchronized(mx_);
@@ -178,13 +194,13 @@ public:
ctx_=0;
}
}
-
+
void setContext(void* ctx){
synchronized(mx_);
ctx_=ctx;
activated_=false;
}
-
+
SyncedBoolCondition isActivated() const{
return SyncedBoolCondition(activated_,mx_);
}
@@ -195,7 +211,7 @@ public:
WatcherActivationTracker::WatcherActivationTracker():
wrapper_(new ActivateWatcherWrapper)
-{
+{
}
WatcherActivationTracker::~WatcherActivationTracker(){
@@ -245,7 +261,8 @@ public:
DeliverWatchersWrapper(int type,int state,bool terminate):
type_(type),state_(state),
allDelivered_(false),terminate_(terminate),zh_(0),deliveryCounter_(0){}
- virtual void call(zhandle_t* zh,int type,int state, const char* path, watcher_object_list **list){
+ virtual void call(zhandle_t* zh, int type, int state,
+ const char* path, watcher_object_list **list) {
{
synchronized(mx_);
zh_=zh;
@@ -255,8 +272,8 @@ public:
if(type_==type && state_==state){
if(terminate_){
// prevent zhandle_t from being prematurely distroyed;
- // this will also ensure that zookeeper_close() cleanups the thread
- // resources by calling finish_adaptor()
+ // this will also ensure that zookeeper_close() cleanups the
+ // thread resources by calling finish_adaptor()
inc_ref_counter(zh,1);
terminateZookeeperThreads(zh);
}
@@ -302,7 +319,7 @@ WatcherDeliveryTracker::~WatcherDelivery
delete deliveryWrapper_;
}
-SyncedBoolCondition WatcherDeliveryTracker::isWatcherProcessingCompleted() const{
+SyncedBoolCondition WatcherDeliveryTracker::isWatcherProcessingCompleted() const {
return deliveryWrapper_->isDelivered();
}
@@ -310,7 +327,7 @@ void WatcherDeliveryTracker::resetDelive
deliveryWrapper_->resetDeliveryCounter();
}
-SyncedIntegerEqual WatcherDeliveryTracker::deliveryCounterEquals(int expected) const{
+SyncedIntegerEqual WatcherDeliveryTracker::deliveryCounterEquals(int expected) const {
return deliveryWrapper_->deliveryCounterEquals(expected);
}
@@ -327,6 +344,7 @@ string HandshakeResponse::toString() con
tmp=htonl(passwd_len);
buf.append((char*)&tmp,sizeof(tmp));
buf.append(passwd,sizeof(passwd));
+ buf.append(&readOnly,sizeof(readOnly));
// finally set the buffer length
tmp=htonl(buf.size()+sizeof(tmp));
buf.insert(0,(char*)&tmp, sizeof(tmp));
@@ -335,12 +353,12 @@ string HandshakeResponse::toString() con
string ZooGetResponse::toString() const{
oarchive* oa=create_buffer_oarchive();
-
+
ReplyHeader h = {xid_,1,ZOK};
serialize_ReplyHeader(oa, "hdr", &h);
-
+
GetDataResponse resp;
- char buf[1024];
+ char buf[1024];
assert("GetDataResponse is too long"&&data_.size()<=sizeof(buf));
resp.data.len=data_.size();
resp.data.buff=buf;
@@ -350,34 +368,34 @@ string ZooGetResponse::toString() const{
int32_t len=htonl(get_buffer_len(oa));
string res((char*)&len,sizeof(len));
res.append(get_buffer(oa),get_buffer_len(oa));
-
+
close_buffer_oarchive(&oa,1);
return res;
}
string ZooStatResponse::toString() const{
oarchive* oa=create_buffer_oarchive();
-
+
ReplyHeader h = {xid_,1,rc_};
serialize_ReplyHeader(oa, "hdr", &h);
-
+
SetDataResponse resp;
resp.stat=stat_;
serialize_SetDataResponse(oa, "reply", &resp);
int32_t len=htonl(get_buffer_len(oa));
string res((char*)&len,sizeof(len));
res.append(get_buffer(oa),get_buffer_len(oa));
-
+
close_buffer_oarchive(&oa,1);
return res;
}
string ZooGetChildrenResponse::toString() const{
oarchive* oa=create_buffer_oarchive();
-
+
ReplyHeader h = {xid_,1,rc_};
serialize_ReplyHeader(oa, "hdr", &h);
-
+
GetChildrenResponse resp;
// populate the string vector
allocate_String_vector(&resp.children,strings_.size());
@@ -385,11 +403,11 @@ string ZooGetChildrenResponse::toString(
resp.children.data[i]=strdup(strings_[i].c_str());
serialize_GetChildrenResponse(oa, "reply", &resp);
deallocate_GetChildrenResponse(&resp);
-
+
int32_t len=htonl(get_buffer_len(oa));
string res((char*)&len,sizeof(len));
res.append(get_buffer(oa),get_buffer_len(oa));
-
+
close_buffer_oarchive(&oa,1);
return res;
}
@@ -398,35 +416,35 @@ string ZNodeEvent::toString() const{
oarchive* oa=create_buffer_oarchive();
struct WatcherEvent evt = {type_,0,(char*)path_.c_str()};
struct ReplyHeader h = {WATCHER_EVENT_XID,0,ZOK };
-
+
serialize_ReplyHeader(oa, "hdr", &h);
serialize_WatcherEvent(oa, "event", &evt);
-
+
int32_t len=htonl(get_buffer_len(oa));
string res((char*)&len,sizeof(len));
res.append(get_buffer(oa),get_buffer_len(oa));
-
+
close_buffer_oarchive(&oa,1);
return res;
}
string PingResponse::toString() const{
oarchive* oa=create_buffer_oarchive();
-
+
ReplyHeader h = {PING_XID,1,ZOK};
serialize_ReplyHeader(oa, "hdr", &h);
-
+
int32_t len=htonl(get_buffer_len(oa));
string res((char*)&len,sizeof(len));
res.append(get_buffer(oa),get_buffer_len(oa));
-
+
close_buffer_oarchive(&oa,1);
return res;
}
//******************************************************************************
// Zookeeper server simulator
-//
+//
bool ZookeeperServer::hasMoreRecv() const{
return recvHasMore.get()!=0 || connectionLost;
}
@@ -467,7 +485,7 @@ void ZookeeperServer::notifyBufferSent(c
// handle the handshake
int64_t sessId=sessionExpired?req->sessionId+1:req->sessionId;
sessionExpired=false;
- addRecvResponse(new HandshakeResponse(sessId));
+ addRecvResponse(new HandshakeResponse(sessId));
return;
}
// not a connect request -- fall thru
@@ -491,7 +509,8 @@ void ZookeeperServer::notifyBufferSent(c
++closeSent;
return; // no reply for close requests
}
- // get the next response from the response queue and append it to the receive list
+ // get the next response from the response queue and append it to the
+ // receive list
Element e;
{
synchronized(respQMx);
@@ -507,7 +526,7 @@ void ZookeeperServer::notifyBufferSent(c
void forceConnected(zhandle_t* zh){
// simulate connected state
zh->state=ZOO_CONNECTED_STATE;
-
+
// Simulate we're connected to the first host in our host list
zh->fd=ZookeeperServer::FD;
assert(zh->addrs.count > 0);
@@ -515,8 +534,8 @@ void forceConnected(zhandle_t* zh){
zh->addrs.next++;
zh->input_buffer=0;
- gettimeofday(&zh->last_recv,0);
- gettimeofday(&zh->last_send,0);
+ gettimeofday(&zh->last_recv,0);
+ gettimeofday(&zh->last_send,0);
}
void terminateZookeeperThreads(zhandle_t* zh){
Modified: zookeeper/trunk/src/c/tests/ZKMocks.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/ZKMocks.h?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/tests/ZKMocks.h (original)
+++ zookeeper/trunk/src/c/tests/ZKMocks.h Mon Jul 7 21:43:44 2014
@@ -303,8 +303,9 @@ public:
class HandshakeResponse: public Response
{
public:
- HandshakeResponse(int64_t sessId=1)
- :protocolVersion(1),timeOut(10000),sessionId(sessId),passwd_len(sizeof(passwd))
+ HandshakeResponse(int64_t sessId=1):
+ protocolVersion(1),timeOut(10000),sessionId(sessId),
+ passwd_len(sizeof(passwd)),readOnly(0)
{
memcpy(passwd,"1234567890123456",sizeof(passwd));
}
@@ -313,6 +314,7 @@ public:
int64_t sessionId;
int32_t passwd_len;
char passwd[16];
+ char readOnly;
virtual std::string toString() const ;
};
Added: zookeeper/trunk/src/c/tests/quorum.cfg
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/quorum.cfg?rev=1608620&view=auto
==============================================================================
--- zookeeper/trunk/src/c/tests/quorum.cfg (added)
+++ zookeeper/trunk/src/c/tests/quorum.cfg Mon Jul 7 21:43:44 2014
@@ -0,0 +1,8 @@
+tickTime=500
+initLimit=10
+syncLimit=5
+dataDir=/tmp/zkdata
+clientPort=22181
+server.1=localhost:22881:33881
+server.2=localhost:22882:33882
+server.3=localhost:22883:33883
Modified: zookeeper/trunk/src/c/tests/zkServer.sh
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/zkServer.sh?rev=1608620&r1=1608619&r2=1608620&view=diff
==============================================================================
--- zookeeper/trunk/src/c/tests/zkServer.sh (original)
+++ zookeeper/trunk/src/c/tests/zkServer.sh Mon Jul 7 21:43:44 2014
@@ -21,7 +21,7 @@ ZOOPORT=22181
if [ "x$1" == "x" ]
then
- echo "USAGE: $0 startClean|start|stop hostPorts"
+ echo "USAGE: $0 startClean|start|startReadOnly|stop hostPorts"
exit 2
fi
@@ -151,6 +151,23 @@ start|startClean)
fi
;;
+startReadOnly)
+ if [ "x${base_dir}" == "x" ]
+ then
+ echo "this target is for unit tests only"
+ exit 2
+ else
+ mkdir -p /tmp/zkdata
+ rm -f /tmp/zkdata/myid && echo 1 > /tmp/zkdata/myid
+
+ # force read-only mode
+ java -cp "$CLASSPATH" -Dreadonlymode.enabled=true org.apache.zookeeper.server.quorum.QuorumPeerMain ${base_dir}/src/c/tests/quorum.cfg &> "${base_dir}/build/tmp/zk.log" &
+ pid=$!
+ echo -n $pid > "${base_dir}/build/tmp/zk.pid"
+ sleep 3 # wait until read-only server is up
+ fi
+
+ ;;
stop)
# Already killed above
;;