You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2013/03/07 07:01:50 UTC
svn commit: r1453693 [2/5] - in /zookeeper/trunk: ./ src/ src/c/
src/c/include/ src/c/src/ src/c/tests/ src/java/main/org/apache/zookeeper/
src/java/main/org/apache/zookeeper/cli/
src/java/main/org/apache/zookeeper/common/ src/java/main/org/apache/zook...
Modified: zookeeper/trunk/src/c/src/cli.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/cli.c?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/cli.c (original)
+++ zookeeper/trunk/src/c/src/cli.c Thu Mar 7 06:01:49 2013
@@ -57,7 +57,7 @@ static int recvd=0;
static int shutdownThisThing=0;
-static __attribute__ ((unused)) void
+static __attribute__ ((unused)) void
printProfileInfo(struct timeval start, struct timeval end, int thres,
const char* msg)
{
@@ -159,10 +159,10 @@ void dumpStat(const struct Stat *stat) {
}
tctime = stat->ctime/1000;
tmtime = stat->mtime/1000;
-
+
ctime_r(&tmtime, tmtimes);
ctime_r(&tctime, tctimes);
-
+
fprintf(stderr, "\tctime = %s\tczxid=%llx\n"
"\tmtime=%s\tmzxid=%llx\n"
"\tversion=%x\taversion=%x\n"
@@ -326,6 +326,9 @@ void processline(char *line) {
fprintf(stderr, " myid\n");
fprintf(stderr, " verbose\n");
fprintf(stderr, " addauth <id> <scheme>\n");
+ fprintf(stderr, " config\n");
+ fprintf(stderr, " reconfig [-file <path> | -members <serverId=host:port1:port2;port3>,... | "
+ " -add <serverId=host:port1:port2;port3>,... | -remove <serverId>,...] [-version <version>]\n");
fprintf(stderr, " quit\n");
fprintf(stderr, "\n");
fprintf(stderr, " prefix the command with the character 'a' to run the command asynchronously.\n");
@@ -347,12 +350,122 @@ void processline(char *line) {
fprintf(stderr, "Path must start with /, found: %s\n", line);
return;
}
-
+
rc = zoo_aget(zh, line, 1, my_data_completion, strdup(line));
if (rc) {
fprintf(stderr, "Error %d for %s\n", rc, line);
}
- } else if (startsWith(line, "set ")) {
+ } else if (strcmp(line, "config") == 0) {
+ gettimeofday(&startTime, 0);
+ rc = zoo_agetconfig(zh, 1, my_data_completion, strdup(ZOO_CONFIG_NODE));
+ if (rc) {
+ fprintf(stderr, "Error %d for %s\n", rc, line);
+ }
+ } else if (startsWith(line, "reconfig ")) {
+ line += 9;
+ int syntaxError = 0;
+
+ char* joining = NULL;
+ char* leaving = NULL;
+ char* members = NULL;
+ size_t members_size = 0;
+
+ int mode = 0; // 0 = not set, 1 = incremental, 2 = non-incremental
+ int64_t version = -1;
+
+ char *p = strtok (strdup(line)," ");
+
+ while (p != NULL) {
+ if (strcmp(p, "-add")==0) {
+ p = strtok (NULL," ");
+ if (mode == 2 || p == NULL) {
+ syntaxError = 1;
+ break;
+ }
+ mode = 1;
+ joining = strdup(p);
+ } else if (strcmp(p, "-remove")==0){
+ p = strtok (NULL," ");
+ if (mode == 2 || p == NULL) {
+ syntaxError = 1;
+ break;
+ }
+ mode = 1;
+ leaving = strdup(p);
+ } else if (strcmp(p, "-members")==0) {
+ p = strtok (NULL," ");
+ if (mode == 1 || p == NULL) {
+ syntaxError = 1;
+ break;
+ }
+ mode = 2;
+ members = strdup(p);
+ } else if (strcmp(p, "-file")==0){
+ p = strtok (NULL," ");
+ if (mode == 1 || p == NULL) {
+ syntaxError = 1;
+ break;
+ }
+ mode = 2;
+ FILE *fp = fopen(p, "r");
+ if (fp == NULL) {
+ fprintf(stderr, "Error reading file: %s\n", p);
+ syntaxError = 1;
+ break;
+ }
+ fseek(fp, 0L, SEEK_END); /* Position to end of file */
+ members_size = ftell(fp); /* Get file length */
+ rewind(fp); /* Back to start of file */
+ members = calloc(members_size + 1, sizeof(char));
+ if(members == NULL )
+ {
+ fprintf(stderr, "\nInsufficient memory to read file: %s\n", p);
+ syntaxError = 1;
+ fclose(fp);
+ break;
+ }
+
+ /* Read the entire file into members
+ * NOTE: -- fread returns number of items successfully read
+ * not the number of bytes. We're requesting one item of
+ * members_size bytes. So we expect the return value here
+ * to be 1.
+ */
+ if (fread(members, members_size, 1, fp) != 1){
+ fprintf(stderr, "Error reading file: %s\n", p);
+ syntaxError = 1;
+ fclose(fp);
+ break;
+ }
+ fclose(fp);
+ } else if (strcmp(p, "-version")==0){
+ p = strtok (NULL," ");
+ if (version != -1 || p == NULL){
+ syntaxError = 1;
+ break;
+ }
+ version = strtoull(p, NULL, 16);
+ if (version < 0) {
+ syntaxError = 1;
+ break;
+ }
+ } else {
+ syntaxError = 1;
+ break;
+ }
+ p = strtok (NULL," ");
+ }
+ if (syntaxError) return;
+
+ rc = zoo_areconfig(zh, joining, leaving, members, version, my_data_completion, strdup(line));
+ free(joining);
+ free(leaving);
+ free(members);
+ if (rc) {
+ fprintf(stderr, "Error %d for %s\n", rc, line);
+ }
+
+ } else if (startsWith(line, "set ")) {
char *ptr;
line += 4;
if (line[0] != '/') {
@@ -497,7 +610,7 @@ void processline(char *line) {
printf("session Id = %llx\n", _LL_CAST_ zoo_client_id(zh)->client_id);
} else if (strcmp(line, "reinit") == 0) {
zookeeper_close(zh);
- // we can't send myid to the server here -- zookeeper_close() removes
+ // we can't send myid to the server here -- zookeeper_close() removes
// the session on the server. We must start anew.
zh = zookeeper_init(hostPort, watcher, 30000, 0, 0, 0);
} else if (startsWith(line, "quit")) {
@@ -528,7 +641,7 @@ int main(int argc, char **argv) {
#endif
char buffer[4096];
char p[2048];
-#ifdef YCA
+#ifdef YCA
char *cert=0;
char appId[64];
#endif
@@ -656,7 +769,7 @@ int main(int argc, char **argv) {
processline(cmd);
processed=1;
}
- if (FD_ISSET(0, &rfds)) {
+ if (!processed && FD_ISSET(0, &rfds)) {
int rc;
int len = sizeof(buffer) - bufoff -1;
if (len <= 0) {
Modified: zookeeper/trunk/src/c/src/zookeeper.c
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/src/zookeeper.c?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/c/src/zookeeper.c (original)
+++ zookeeper/trunk/src/c/src/zookeeper.c Thu Mar 7 06:01:49 2013
@@ -189,10 +189,10 @@ static int deserialize_multi(int xid, co
/* completion routine forward declarations */
static int add_completion(zhandle_t *zh, int xid, int completion_type,
- const void *dc, const void *data, int add_to_front,
+ const void *dc, const void *data, int add_to_front,
watcher_registration_t* wo, completion_head_t *clist);
static completion_list_t* create_completion_entry(int xid, int completion_type,
- const void *dc, const void *data, watcher_registration_t* wo,
+ const void *dc, const void *data, watcher_registration_t* wo,
completion_head_t *clist);
static void destroy_completion_entry(completion_list_t* c);
static void queue_completion_nolock(completion_head_t *list, completion_list_t *c,
@@ -438,12 +438,12 @@ static void setup_random()
#ifndef __CYGWIN__
/**
- * get the errno from the return code
+ * get the errno from the return code
* of get addrinfo. Errno is not set
* with the call to getaddrinfo, so thats
* why we have to do this.
*/
-static int getaddrinfo_errno(int rc) {
+static int getaddrinfo_errno(int rc) {
switch(rc) {
case EAI_NONAME:
// ZOOKEEPER-1323 EAI_NODATA and EAI_ADDRFAMILY are deprecated in FreeBSD.
@@ -616,10 +616,10 @@ int resolve_hosts(const char *hosts_in,
if ((rc = getaddrinfo(host, port_spec, &hints, &res0)) != 0) {
//bug in getaddrinfo implementation when it returns
- //EAI_BADFLAGS or EAI_ADDRFAMILY with AF_UNSPEC and
+ //EAI_BADFLAGS or EAI_ADDRFAMILY with AF_UNSPEC and
// ai_flags as AI_ADDRCONFIG
#ifdef AI_ADDRCONFIG
- if ((hints.ai_flags == AI_ADDRCONFIG) &&
+ if ((hints.ai_flags == AI_ADDRCONFIG) &&
// ZOOKEEPER-1323 EAI_NODATA and EAI_ADDRFAMILY are deprecated in FreeBSD.
#ifdef EAI_ADDRFAMILY
((rc ==EAI_BADFLAGS) || (rc == EAI_ADDRFAMILY))) {
@@ -707,19 +707,19 @@ fail:
* a) the server this client is currently connected is not in new address list.
* Otherwise (if currentHost is in the new list):
* b) the number of servers in the cluster is increasing - in this case the load
- * on currentHost should decrease, which means that SOME of the clients
+ * on currentHost should decrease, which means that SOME of the clients
* connected to it will migrate to the new servers. The decision whether this
- * client migrates or not is probabilistic so that the expected number of
+ * client migrates or not is probabilistic so that the expected number of
* clients connected to each server is the same.
- *
- * If reconfig is set to true, the function sets pOld and pNew that correspond
+ *
+ * If reconfig is set to true, the function sets pOld and pNew that correspond
* to the probability to migrate to ones of the new servers or one of the old
* servers (migrating to one of the old servers is done only if our client's
- * currentHost is not in new list).
- *
+ * currentHost is not in new list).
+ *
* See zoo_cycle_next_server for the selection logic.
- *
- * See {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1355} for the
+ *
+ * See {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1355} for the
* protocol and its evaluation,
*/
int update_addrs(zhandle_t *zh)
@@ -741,7 +741,7 @@ int update_addrs(zhandle_t *zh)
if (zh->hostname == NULL)
{
return ZSYSTEMERROR;
- }
+ }
// NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new}
lock_reconfig(zh);
@@ -784,7 +784,7 @@ int update_addrs(zhandle_t *zh)
{
goto fail;
}
- }
+ }
else {
rc = addrvec_append(&zh->addrs_new, resolved_address);
if (rc != ZOK)
@@ -811,12 +811,12 @@ int update_addrs(zhandle_t *zh)
zh->reconfig = 0;
}
} else {
- // my server is not in the new config, and load on old servers must
+ // my server is not in the new config, and load on old servers must
// be decreased, so connect to one of the new servers
zh->pNew = 1;
zh->pOld = 0;
}
- }
+ }
// Number of servers stayed the same or decreased
else {
@@ -833,8 +833,8 @@ int update_addrs(zhandle_t *zh)
addrvec_free(&zh->addrs);
zh->addrs = resolved;
- // If we need to do a reconfig and we're currently connected to a server,
- // then force close that connection so on next interest() call we'll make a
+ // If we need to do a reconfig and we're currently connected to a server,
+ // then force close that connection so on next interest() call we'll make a
// new connection
if (zh->reconfig == 1 && zh->fd != -1)
{
@@ -847,7 +847,7 @@ fail:
unlock_reconfig(zh);
- // If we short-circuited out and never assigned resolved to zh->addrs then we
+ // If we short-circuited out and never assigned resolved to zh->addrs then we
// need to free resolved to avoid a memleak.
if (zh->addrs.data != resolved.data)
{
@@ -1092,15 +1092,15 @@ int zoo_set_servers(zhandle_t *zh, const
* we've updated the server list to connect to, and are now trying to find some
* server to connect to. Once we get successfully connected, 'reconfig' mode is
* set to false. Similarly, if we tried to connect to all servers in new config
- * and failed, 'reconfig' mode is set to false.
+ * and failed, 'reconfig' mode is set to false.
*
* While in 'reconfig' mode, we should connect to a server in the new set of
- * servers (addrs_new) with probability pNew and to servers in the old set of
+ * servers (addrs_new) with probability pNew and to servers in the old set of
* servers (addrs_old) with probability pOld (which is just 1-pNew). If we tried
- * out all servers in either, we continue to try servers from the other set,
+ * out all servers in either, we continue to try servers from the other set,
* regardless of pNew or pOld. If we tried all servers we give up and go back to
* the normal round robin mode
- *
+ *
* When called, must be protected by lock_reconfig(zh).
*/
static int get_next_server_in_reconfig(zhandle_t *zh)
@@ -1108,16 +1108,16 @@ static int get_next_server_in_reconfig(z
int take_new = drand48() <= zh->pNew;
LOG_DEBUG(("[OLD] count=%d capacity=%d next=%d hasnext=%d",
- zh->addrs_old.count, zh->addrs_old.capacity, zh->addrs_old.next,
+ zh->addrs_old.count, zh->addrs_old.capacity, zh->addrs_old.next,
addrvec_hasnext(&zh->addrs_old)));
LOG_DEBUG(("[NEW] count=%d capacity=%d next=%d hasnext=%d",
- zh->addrs_new.count, zh->addrs_new.capacity, zh->addrs_new.next,
+ zh->addrs_new.count, zh->addrs_new.capacity, zh->addrs_new.next,
addrvec_hasnext(&zh->addrs_new)));
// Take one of the new servers if we haven't tried them all yet
// and either the probability tells us to connect to one of the new servers
// or if we already tried them all then use one of the old servers
- if (addrvec_hasnext(&zh->addrs_new)
+ if (addrvec_hasnext(&zh->addrs_new)
&& (take_new || !addrvec_hasnext(&zh->addrs_old)))
{
addrvec_next(&zh->addrs_new, &zh->addr_cur);
@@ -1137,14 +1137,14 @@ static int get_next_server_in_reconfig(z
return 1;
}
-/**
+/**
* Cycle through our server list to the correct 'next' server. The 'next' server
* to connect to depends upon whether we're in a 'reconfig' mode or not. Reconfig
* mode means we've upated the server list and are now trying to find a server
* to connect to. Once we get connected, we are no longer in the reconfig mode.
* Similarly, if we try to connect to all the servers in the new configuration
* and failed, reconfig mode is set to false.
- *
+ *
* For more algorithm details, see get_next_server_in_reconfig.
*/
void zoo_cycle_next_server(zhandle_t *zh)
@@ -1172,7 +1172,7 @@ void zoo_cycle_next_server(zhandle_t *zh
return;
}
-/**
+/**
* Get the host:port for the server we are currently connecting to or connected
* to. This is largely for testing purposes but is also generally useful for
* other client software built on top of this client.
@@ -1364,9 +1364,9 @@ static int send_buffer(int fd, buffer_li
if (rc == -1) {
#ifndef _WINDOWS
if (errno != EAGAIN) {
-#else
+#else
if (WSAGetLastError() != WSAEWOULDBLOCK) {
-#endif
+#endif
return -1;
} else {
return 0;
@@ -1383,9 +1383,9 @@ static int send_buffer(int fd, buffer_li
if (rc == -1) {
#ifndef _WINDOWS
if (errno != EAGAIN) {
-#else
+#else
if (WSAGetLastError() != WSAEWOULDBLOCK) {
-#endif
+#endif
return -1;
}
} else {
@@ -1670,7 +1670,7 @@ static int send_auth_info(zhandle_t *zh)
}
static int send_last_auth_info(zhandle_t *zh)
-{
+{
int rc = 0;
auth_info *auth = NULL;
@@ -1724,7 +1724,7 @@ static int send_set_watches(zhandle_t *z
/* add this buffer to the head of the send queue */
rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa),
get_buffer_len(oa));
- /* We queued the buffer, so don't free it */
+ /* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
free_key_list(req.dataWatches.data, req.dataWatches.count);
free_key_list(req.existWatches.data, req.existWatches.count);
@@ -1888,7 +1888,7 @@ int zookeeper_interest(zhandle_t *zh, in
gettimeofday(&now, 0);
if(zh->next_deadline.tv_sec!=0 || zh->next_deadline.tv_usec!=0){
int time_left = calculate_interval(&zh->next_deadline, &now);
- int max_exceed = zh->recv_timeout / 10 > 200 ? 200 :
+ int max_exceed = zh->recv_timeout / 10 > 200 ? 200 :
(zh->recv_timeout / 10);
if (time_left > max_exceed)
LOG_WARN(("Exceeded deadline by %dms", time_left));
@@ -1907,7 +1907,7 @@ int zookeeper_interest(zhandle_t *zh, in
if (*fd == -1) {
- /*
+ /*
* If we previously failed to connect to server pool (zh->delay == 1)
* then we need delay our connection on this iteration 1/60 of the
* recv timeout before trying again so we don't spin.
@@ -1921,7 +1921,7 @@ int zookeeper_interest(zhandle_t *zh, in
LOG_WARN(("Delaying connection after exhaustively trying all servers [%s]",
zh->hostname));
- }
+ }
// No need to delay -- grab the next server and attempt connection
else {
@@ -1944,7 +1944,7 @@ int zookeeper_interest(zhandle_t *zh, in
LOG_WARN(("Unable to set TCP_NODELAY, operation latency may be effected"));
}
#ifdef WIN32
- ioctlsocket(zh->fd, FIONBIO, &nonblocking_flag);
+ ioctlsocket(zh->fd, FIONBIO, &nonblocking_flag);
#else
fcntl(zh->fd, F_SETFL, O_NONBLOCK|fcntl(zh->fd, F_GETFL, 0));
#endif
@@ -2233,7 +2233,7 @@ static void process_sync_completion(
cptr->c.type, cptr->xid, sc->rc));
switch(cptr->c.type) {
- case COMPLETION_DATA:
+ case COMPLETION_DATA:
if (sc->rc==0) {
struct GetDataResponse res;
int len;
@@ -2289,7 +2289,7 @@ static void process_sync_completion(
const char * client_path;
deserialize_CreateResponse(ia, "reply", &res);
//ZOOKEEPER-1027
- client_path = sub_string(zh, res.path);
+ client_path = sub_string(zh, res.path);
len = strlen(client_path) + 1;if (len > sc->u.str.str_len) {
len = sc->u.str.str_len;
}
@@ -2672,7 +2672,7 @@ int zookeeper_process(zhandle_t *zh, int
*sc = (struct sync_completion*)cptr->data;
sc->rc = rc;
- process_sync_completion(cptr, sc, ia, zh);
+ process_sync_completion(cptr, sc, ia, zh);
notify_sync_completion(sc);
free_buffer(bptr);
@@ -2771,9 +2771,9 @@ static void destroy_completion_entry(com
}
}
-static void queue_completion_nolock(completion_head_t *list,
+static void queue_completion_nolock(completion_head_t *list,
completion_list_t *c,
- int add_to_front)
+ int add_to_front)
{
c->next = 0;
/* appending a new entry to the back of the list */
@@ -2990,7 +2990,7 @@ static int isValidPath(const char* path,
* REQUEST INIT HELPERS
*---------------------------------------------------------------------------*/
/* Common Request init helper functions to reduce code duplication */
-static int Request_path_init(zhandle_t *zh, int flags,
+static int Request_path_init(zhandle_t *zh, int flags,
char **path_out, const char *path)
{
assert(path_out);
@@ -3052,7 +3052,7 @@ int zoo_awget(zhandle_t *zh, const char
rc = rc < 0 ? rc : serialize_GetDataRequest(oa, "req", &req);
enter_critical(zh);
rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data,
- create_watcher_registration(server_path,data_result_checker,watcher,watcherCtx));
+ create_watcher_registration(server_path,data_result_checker,watcher,watcherCtx));
rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
get_buffer_len(oa));
leave_critical(zh);
@@ -3067,6 +3067,87 @@ int zoo_awget(zhandle_t *zh, const char
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
}
+int zoo_agetconfig(zhandle_t *zh, int watch, data_completion_t dc,
+ const void *data)
+{
+ return zoo_awgetconfig(zh,watch?zh->watcher:0,zh->context,dc,data);
+}
+
+int zoo_awgetconfig(zhandle_t *zh, watcher_fn watcher, void* watcherCtx,
+ data_completion_t dc, const void *data)
+{
+ struct oarchive *oa;
+ char *path = ZOO_CONFIG_NODE;
+ char *server_path = ZOO_CONFIG_NODE;
+ struct RequestHeader h = { get_xid(), ZOO_GETDATA_OP };
+ struct GetDataRequest req = { (char*)server_path, watcher!=0 };
+ int rc;
+
+ if (zh==0 || !isValidPath(server_path, 0)) {
+ free_duplicate_path(server_path, path);
+ return ZBADARGUMENTS;
+ }
+ if (is_unrecoverable(zh)) {
+ free_duplicate_path(server_path, path);
+ return ZINVALIDSTATE;
+ }
+ oa=create_buffer_oarchive();
+ rc = serialize_RequestHeader(oa, "header", &h);
+ rc = rc < 0 ? rc : serialize_GetDataRequest(oa, "req", &req);
+ enter_critical(zh);
+ rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data,
+ create_watcher_registration(server_path,data_result_checker,watcher,watcherCtx));
+ rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
+ get_buffer_len(oa));
+ leave_critical(zh);
+ free_duplicate_path(server_path, path);
+ /* We queued the buffer, so don't free it */
+ close_buffer_oarchive(&oa, 0);
+
+ LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
+ zoo_get_current_server(zh)));
+ /* make a best (non-blocking) effort to send the requests asap */
+ adaptor_send_queue(zh, 0);
+ return (rc < 0)?ZMARSHALLINGERROR:ZOK;
+}
+
+int zoo_areconfig(zhandle_t *zh, const char *joining, const char *leaving,
+ const char *members, int64_t version, data_completion_t dc, const void *data)
+{
+ struct oarchive *oa;
+ struct RequestHeader h = { get_xid(), ZOO_RECONFIG_OP };
+ struct ReconfigRequest req;
+ int rc = 0;
+
+ if (zh==0) {
+ return ZBADARGUMENTS;
+ }
+ if (is_unrecoverable(zh)) {
+ return ZINVALIDSTATE;
+ }
+
+ oa=create_buffer_oarchive();
+ req.joiningServers = (char *)joining;
+ req.leavingServers = (char *)leaving;
+ req.newMembers = (char *)members;
+ req.curConfigId = version;
+ rc = serialize_RequestHeader(oa, "header", &h);
+ rc = rc < 0 ? rc : serialize_ReconfigRequest(oa, "req", &req);
+ enter_critical(zh);
+ rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data, NULL);
+ rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
+ get_buffer_len(oa));
+ leave_critical(zh);
+ /* We queued the buffer, so don't free it */
+ close_buffer_oarchive(&oa, 0);
+
+ LOG_DEBUG(("Sending Reconfig request xid=%#x to %s",h.xid, zoo_get_current_server(zh)));
+ /* make a best (non-blocking) effort to send the requests asap */
+ adaptor_send_queue(zh, 0);
+
+ return (rc < 0)?ZMARSHALLINGERROR:ZOK;
+}
+
static int SetDataRequest_init(zhandle_t *zh, struct SetDataRequest *req,
const char *path, const char *buffer, int buflen, int version)
{
@@ -3223,7 +3304,7 @@ int zoo_acreate2(zhandle_t *zh, const ch
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
}
-int DeleteRequest_init(zhandle_t *zh, struct DeleteRequest *req,
+int DeleteRequest_init(zhandle_t *zh, struct DeleteRequest *req,
const char *path, int version)
{
int rc = Request_path_init(zh, 0, &req->path, path);
@@ -3526,7 +3607,7 @@ static void op_result_stat_completion(in
} else {
result->stat = NULL ;
}
-}
+}
static int CheckVersionRequest_init(zhandle_t *zh, struct CheckVersionRequest *req,
const char *path, int version)
@@ -3565,16 +3646,16 @@ int zoo_amulti(zhandle_t *zh, int count,
case ZOO_CREATE_OP: {
struct CreateRequest req;
- rc = rc < 0 ? rc : CreateRequest_init(zh, &req,
- op->create_op.path, op->create_op.data,
- op->create_op.datalen, op->create_op.acl,
+ rc = rc < 0 ? rc : CreateRequest_init(zh, &req,
+ op->create_op.path, op->create_op.data,
+ op->create_op.datalen, op->create_op.acl,
op->create_op.flags);
rc = rc < 0 ? rc : serialize_CreateRequest(oa, "req", &req);
result->value = op->create_op.buf;
result->valuelen = op->create_op.buflen;
enter_critical(zh);
- entry = create_completion_entry(h.xid, COMPLETION_STRING, op_result_string_completion, result, 0, 0);
+ entry = create_completion_entry(h.xid, COMPLETION_STRING, op_result_string_completion, result, 0, 0);
leave_critical(zh);
free_duplicate_path(req.path, op->create_op.path);
break;
@@ -3586,7 +3667,7 @@ int zoo_amulti(zhandle_t *zh, int count,
rc = rc < 0 ? rc : serialize_DeleteRequest(oa, "req", &req);
enter_critical(zh);
- entry = create_completion_entry(h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0);
+ entry = create_completion_entry(h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0);
leave_critical(zh);
free_duplicate_path(req.path, op->delete_op.path);
break;
@@ -3595,13 +3676,13 @@ int zoo_amulti(zhandle_t *zh, int count,
case ZOO_SETDATA_OP: {
struct SetDataRequest req;
rc = rc < 0 ? rc : SetDataRequest_init(zh, &req,
- op->set_op.path, op->set_op.data,
+ op->set_op.path, op->set_op.data,
op->set_op.datalen, op->set_op.version);
rc = rc < 0 ? rc : serialize_SetDataRequest(oa, "req", &req);
result->stat = op->set_op.stat;
enter_critical(zh);
- entry = create_completion_entry(h.xid, COMPLETION_STAT, op_result_stat_completion, result, 0, 0);
+ entry = create_completion_entry(h.xid, COMPLETION_STAT, op_result_stat_completion, result, 0, 0);
leave_critical(zh);
free_duplicate_path(req.path, op->set_op.path);
break;
@@ -3614,15 +3695,15 @@ int zoo_amulti(zhandle_t *zh, int count,
rc = rc < 0 ? rc : serialize_CheckVersionRequest(oa, "req", &req);
enter_critical(zh);
- entry = create_completion_entry(h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0);
+ entry = create_completion_entry(h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0);
leave_critical(zh);
free_duplicate_path(req.path, op->check_op.path);
break;
- }
+ }
default:
LOG_ERROR(("Unimplemented sub-op type=%d in multi-op", op->type));
- return ZUNIMPLEMENTED;
+ return ZUNIMPLEMENTED;
}
queue_completion(&clist, entry, 0);
@@ -3649,7 +3730,7 @@ int zoo_amulti(zhandle_t *zh, int count,
}
void zoo_create_op_init(zoo_op_t *op, const char *path, const char *value,
- int valuelen, const struct ACL_vector *acl, int flags,
+ int valuelen, const struct ACL_vector *acl, int flags,
char *path_buffer, int path_buffer_len)
{
assert(op);
@@ -3686,7 +3767,7 @@ void zoo_delete_op_init(zoo_op_t *op, co
op->delete_op.version = version;
}
-void zoo_set_op_init(zoo_op_t *op, const char *path, const char *buffer,
+void zoo_set_op_init(zoo_op_t *op, const char *path, const char *buffer,
int buflen, int version, struct Stat *stat)
{
assert(op);
@@ -3732,7 +3813,7 @@ int flush_send_queue(zhandle_t*zh, int t
int rc= ZOK;
struct timeval started;
#ifdef WIN32
- fd_set pollSet;
+ fd_set pollSet;
struct timeval wait;
#endif
gettimeofday(&started,0);
@@ -3757,7 +3838,7 @@ int flush_send_queue(zhandle_t*zh, int t
FD_ZERO(&pollSet);
FD_SET(zh->fd, &pollSet);
// Poll the socket
- rc = select((int)(zh->fd)+1, NULL, &pollSet, NULL, &wait);
+ rc = select((int)(zh->fd)+1, NULL, &pollSet, NULL, &wait);
#else
struct pollfd fds;
fds.fd = zh->fd;
@@ -3843,6 +3924,10 @@ const char* zerror(int c)
return "(not error) no server responses to process";
case ZSESSIONMOVED:
return "session moved to another server, so operation is ignored";
+ case ZNEWCONFIGNOQUORUM:
+ return "no quorum of new config is connected and up-to-date with the leader of last commmitted config - try invoking reconfiguration after new servers are connected and synced";
+ case ZRECONFIGINPROGRESS:
+ return "Another reconfiguration is in progress -- concurrent reconfigs not supported (yet)";
}
if (c > 0) {
return strerror(c);
@@ -3862,7 +3947,7 @@ int zoo_add_auth(zhandle_t *zh,const cha
return ZINVALIDSTATE;
// [ZOOKEEPER-800] zoo_add_auth should return ZINVALIDSTATE if
- // the connection is closed.
+ // the connection is closed.
if (zoo_state(zh) == 0) {
return ZINVALIDSTATE;
}
@@ -3919,12 +4004,12 @@ static const char* format_endpoint_info(
}
#endif
#ifdef WIN32
- addrstring = inet_ntoa (*(struct in_addr*)inaddr);
+ addrstring = inet_ntoa (*(struct in_addr*)inaddr);
sprintf(buf,"%s:%d",addrstring,ntohs(port));
#else
inet_ntop(ep->ss_family,inaddr,addrstr,sizeof(addrstr)-1);
sprintf(buf,"%s:%d",addrstr,ntohs(port));
-#endif
+#endif
return buf;
}
@@ -4056,6 +4141,48 @@ int zoo_wget(zhandle_t *zh, const char *
return rc;
}
+int zoo_getconfig(zhandle_t *zh, int watch, char *buffer,
+ int* buffer_len, struct Stat *stat)
+{
+ return zoo_wget(zh,ZOO_CONFIG_NODE,watch?zh->watcher:0,zh->context, buffer,buffer_len,stat);
+}
+
+int zoo_wgetconfig(zhandle_t *zh, watcher_fn watcher, void* watcherCtx,
+ char *buffer, int* buffer_len, struct Stat *stat)
+{
+ return zoo_wget(zh, ZOO_CONFIG_NODE, watcher, watcherCtx, buffer, buffer_len, stat);
+}
+
+
+int zoo_reconfig(zhandle_t *zh, const char *joining, const char *leaving,
+ const char *members, int64_t version, char *buffer, int* buffer_len,
+ struct Stat *stat)
+{
+ struct sync_completion *sc;
+ int rc=0;
+
+ if(buffer_len==NULL)
+ return ZBADARGUMENTS;
+ if((sc=alloc_sync_completion())==NULL)
+ return ZSYSTEMERROR;
+
+ sc->u.data.buffer = buffer;
+ sc->u.data.buff_len = *buffer_len;
+ rc=zoo_areconfig(zh, joining, leaving, members, version, SYNCHRONOUS_MARKER, sc);
+
+ if(rc==ZOK){
+ wait_sync_completion(sc);
+ rc = sc->rc;
+ if (rc == 0) {
+ if(stat)
+ *stat = sc->u.data.stat;
+ *buffer_len = sc->u.data.buff_len;
+ }
+ }
+ free_sync_completion(sc);
+ return rc;
+}
+
int zoo_set(zhandle_t *zh, const char *path, const char *buffer, int buflen,
int version)
{
Added: zookeeper/trunk/src/c/tests/TestReconfigServer.cc
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/TestReconfigServer.cc?rev=1453693&view=auto
==============================================================================
--- zookeeper/trunk/src/c/tests/TestReconfigServer.cc (added)
+++ zookeeper/trunk/src/c/tests/TestReconfigServer.cc Thu Mar 7 06:01:49 2013
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+#include <algorithm>
+#include <cppunit/extensions/HelperMacros.h>
+#include "zookeeper.h"
+
+#include "Util.h"
+#include "ZooKeeperQuorumServer.h"
+
+class TestReconfigServer : public CPPUNIT_NS::TestFixture {
+ CPPUNIT_TEST_SUITE(TestReconfigServer);
+#ifdef THREADED
+ CPPUNIT_TEST(testNonIncremental);
+ CPPUNIT_TEST(testRemoveConnectedFollower);
+ CPPUNIT_TEST(testRemoveFollower);
+#endif
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+ TestReconfigServer();
+ virtual ~TestReconfigServer();
+ void setUp();
+ void tearDown();
+ void testNonIncremental();
+ void testRemoveConnectedFollower();
+ void testRemoveFollower();
+
+ private:
+ static const uint32_t NUM_SERVERS;
+ FILE* logfile_;
+ std::vector<ZooKeeperQuorumServer*> cluster_;
+ int32_t getLeader();
+ std::vector<int32_t> getFollowers();
+ void parseConfig(char* buf, int len, std::vector<std::string>& servers,
+ std::string& version);
+};
+
+const uint32_t TestReconfigServer::NUM_SERVERS = 3;
+
+TestReconfigServer::
+TestReconfigServer() :
+ logfile_(openlogfile("TestReconfigServer")) {
+ zoo_set_log_stream(logfile_);
+}
+
+TestReconfigServer::
+~TestReconfigServer() {
+ if (logfile_) {
+ fflush(logfile_);
+ fclose(logfile_);
+ logfile_ = NULL;
+ }
+}
+
+void TestReconfigServer::
+setUp() {
+ cluster_ = ZooKeeperQuorumServer::getCluster(NUM_SERVERS);
+ // give the cluster some time to start up.
+ sleep(2);
+}
+
+void TestReconfigServer::
+tearDown() {
+ for (int i = 0; i < cluster_.size(); i++) {
+ delete cluster_[i];
+ }
+ cluster_.clear();
+}
+
+int32_t TestReconfigServer::
+getLeader() {
+ for (int32_t i = 0; i < cluster_.size(); i++) {
+ if (cluster_[i]->isLeader()) {
+ return i;
+ }
+ }
+ return -1;
+}
+
+std::vector<int32_t> TestReconfigServer::
+getFollowers() {
+ std::vector<int32_t> followers;
+ for (int32_t i = 0; i < cluster_.size(); i++) {
+ if (cluster_[i]->isFollower()) {
+ followers.push_back(i);
+ }
+ }
+ return followers;
+}
+
+void TestReconfigServer::
+parseConfig(char* buf, int len, std::vector<std::string>& servers,
+ std::string& version) {
+ std::string config(buf, len);
+ std::stringstream ss(config);
+ std::string line;
+ std::string serverPrefix("server.");
+ std::string versionPrefix("version=");
+ servers.clear();
+ while(std::getline(ss, line, '\n')) {
+ if (line.compare(0, serverPrefix.size(), serverPrefix) == 0) {
+ servers.push_back(line);
+ } else if (line.compare(0, versionPrefix.size(), versionPrefix) == 0) {
+ version = line.substr(versionPrefix.size());
+ }
+ }
+}
+
+/**
+ * 1. Connect to the leader.
+ * 2. Remove a follower using incremental reconfig.
+ * 3. Add the follower back using incremental reconfig.
+ */
+void TestReconfigServer::
+testRemoveFollower() {
+ std::vector<std::string> servers;
+ std::string version;
+ struct Stat stat;
+ int len = 1024;
+ char buf[len];
+
+ // get config from leader.
+ int32_t leader = getLeader();
+ CPPUNIT_ASSERT(leader >= 0);
+ std::string host = cluster_[leader]->getHostPort();
+ zhandle_t* zk = zookeeper_init(host.c_str(), NULL, 10000, NULL, NULL, 0);
+ CPPUNIT_ASSERT_EQUAL((int)ZOK, zoo_getconfig(zk, 0, buf, &len, &stat));
+
+ // check if all the servers are listed in the config.
+ parseConfig(buf, len, servers, version);
+ CPPUNIT_ASSERT_EQUAL(std::string("0"), version);
+ CPPUNIT_ASSERT_EQUAL(NUM_SERVERS, (uint32_t)(servers.size()));
+ for (int i = 0; i < cluster_.size(); i++) {
+ CPPUNIT_ASSERT(std::find(servers.begin(), servers.end(),
+ cluster_[i]->getServerString()) != servers.end());
+ }
+
+ // remove a follower.
+ std::vector<int32_t> followers = getFollowers();
+ len = 1024;
+ CPPUNIT_ASSERT_EQUAL(NUM_SERVERS - 1,
+ (uint32_t)(followers.size()));
+ std::stringstream ss;
+ ss << followers[0];
+ int rc = zoo_reconfig(zk, NULL, ss.str().c_str(), NULL, -1, buf, &len,
+ &stat);
+ CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
+ parseConfig(buf, len, servers, version);
+ CPPUNIT_ASSERT_EQUAL(std::string("100000002"), version);
+ CPPUNIT_ASSERT_EQUAL(NUM_SERVERS - 1, (uint32_t)(servers.size()));
+ for (int i = 0; i < cluster_.size(); i++) {
+ if (i == followers[0]) {
+ continue;
+ }
+ CPPUNIT_ASSERT(std::find(servers.begin(), servers.end(),
+ cluster_[i]->getServerString()) != servers.end());
+ }
+
+ // add the follower back.
+ len = 1024;
+ std::string serverString = cluster_[followers[0]]->getServerString();
+ rc = zoo_reconfig(zk, serverString.c_str(), NULL, NULL, -1, buf, &len,
+ &stat);
+ CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
+ parseConfig(buf, len, servers, version);
+ CPPUNIT_ASSERT_EQUAL(NUM_SERVERS, (uint32_t)(servers.size()));
+ for (int i = 0; i < cluster_.size(); i++) {
+ CPPUNIT_ASSERT(std::find(servers.begin(), servers.end(),
+ cluster_[i]->getServerString()) != servers.end());
+ }
+ zookeeper_close(zk);
+}
+
+/**
+ * 1. Connect to the leader.
+ * 2. Remove a follower using non-incremental reconfig.
+ * 3. Add the follower back using non-incremental reconfig.
+ */
+void TestReconfigServer::
+testNonIncremental() {
+ std::vector<std::string> servers;
+ std::string version;
+ struct Stat stat;
+ int len = 1024;
+ char buf[len];
+
+ // get config from leader.
+ int32_t leader = getLeader();
+ CPPUNIT_ASSERT(leader >= 0);
+ std::string host = cluster_[leader]->getHostPort();
+ zhandle_t* zk = zookeeper_init(host.c_str(), NULL, 10000, NULL, NULL, 0);
+ CPPUNIT_ASSERT_EQUAL((int)ZOK, zoo_getconfig(zk, 0, buf, &len, &stat));
+
+ // check if all the servers are listed in the config.
+ parseConfig(buf, len, servers, version);
+ CPPUNIT_ASSERT_EQUAL(std::string("0"), version);
+ CPPUNIT_ASSERT_EQUAL(NUM_SERVERS, (uint32_t)(servers.size()));
+ for (int i = 0; i < cluster_.size(); i++) {
+ CPPUNIT_ASSERT(std::find(servers.begin(), servers.end(),
+ cluster_[i]->getServerString()) != servers.end());
+ }
+
+ // remove a follower.
+ std::vector<int32_t> followers = getFollowers();
+ len = 1024;
+ CPPUNIT_ASSERT_EQUAL(NUM_SERVERS - 1,
+ (uint32_t)(followers.size()));
+ std::stringstream ss;
+ for (int i = 1; i < followers.size(); i++) {
+ ss << cluster_[followers[i]]->getServerString() << ",";
+ }
+ ss << cluster_[leader]->getServerString();
+
+ int rc = zoo_reconfig(zk, NULL, NULL, ss.str().c_str(), -1, buf, &len,
+ &stat);
+ CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
+ parseConfig(buf, len, servers, version);
+ CPPUNIT_ASSERT_EQUAL(std::string("100000002"), version);
+ CPPUNIT_ASSERT_EQUAL(NUM_SERVERS - 1, (uint32_t)(servers.size()));
+ for (int i = 0; i < cluster_.size(); i++) {
+ if (i == followers[0]) {
+ continue;
+ }
+ CPPUNIT_ASSERT(std::find(servers.begin(), servers.end(),
+ cluster_[i]->getServerString()) != servers.end());
+ }
+
+ // add the follower back.
+ len = 1024;
+ ss.str("");
+ for (int i = 0; i < cluster_.size(); i++) {
+ ss << cluster_[i]->getServerString() << ",";
+ }
+ rc = zoo_reconfig(zk, NULL, NULL, ss.str().c_str(), -1, buf, &len,
+ &stat);
+ CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
+ parseConfig(buf, len, servers, version);
+ CPPUNIT_ASSERT_EQUAL(NUM_SERVERS, (uint32_t)(servers.size()));
+ for (int i = 0; i < cluster_.size(); i++) {
+ CPPUNIT_ASSERT(std::find(servers.begin(), servers.end(),
+ cluster_[i]->getServerString()) != servers.end());
+ }
+ zookeeper_close(zk);
+}
+
+/**
+ * 1. Connect to a follower.
+ * 2. Remove the follower the client is connected to.
+ */
+void TestReconfigServer::
+testRemoveConnectedFollower() {
+ std::vector<std::string> servers;
+ std::string version;
+ struct Stat stat;
+ int len = 1024;
+ char buf[len];
+
+ // connect to a follower.
+ int32_t leader = getLeader();
+ std::vector<int32_t> followers = getFollowers();
+ CPPUNIT_ASSERT(leader >= 0);
+ CPPUNIT_ASSERT_EQUAL(NUM_SERVERS - 1, (uint32_t)(followers.size()));
+ std::stringstream ss;
+ for (int i = 0; i < followers.size(); i++) {
+ ss << cluster_[followers[i]]->getHostPort() << ",";
+ }
+ ss << cluster_[leader]->getHostPort();
+ std::string hosts = ss.str().c_str();
+ zoo_deterministic_conn_order(true);
+ zhandle_t* zk = zookeeper_init(hosts.c_str(), NULL, 10000, NULL, NULL, 0);
+ std::string connectedHost(zoo_get_current_server(zk));
+ std::string portString = connectedHost.substr(connectedHost.find(":") + 1);
+ uint32_t port;
+ std::istringstream (portString) >> port;
+ CPPUNIT_ASSERT_EQUAL(cluster_[followers[0]]->getClientPort(), port);
+
+ // remove the follower.
+ len = 1024;
+ ss.str("");
+ ss << followers[0];
+ zoo_reconfig(zk, NULL, ss.str().c_str(), NULL, -1, buf, &len, &stat);
+ CPPUNIT_ASSERT_EQUAL((int)ZOK, zoo_getconfig(zk, 0, buf, &len, &stat));
+ parseConfig(buf, len, servers, version);
+ CPPUNIT_ASSERT_EQUAL(NUM_SERVERS - 1, (uint32_t)(servers.size()));
+ for (int i = 0; i < cluster_.size(); i++) {
+ if (i == followers[0]) {
+ continue;
+ }
+ CPPUNIT_ASSERT(std::find(servers.begin(), servers.end(),
+ cluster_[i]->getServerString()) != servers.end());
+ }
+}
+
+CPPUNIT_TEST_SUITE_REGISTRATION(TestReconfigServer);
Added: zookeeper/trunk/src/c/tests/ZooKeeperQuorumServer.cc
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/ZooKeeperQuorumServer.cc?rev=1453693&view=auto
==============================================================================
--- zookeeper/trunk/src/c/tests/ZooKeeperQuorumServer.cc (added)
+++ zookeeper/trunk/src/c/tests/ZooKeeperQuorumServer.cc Thu Mar 7 06:01:49 2013
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+#include "ZooKeeperQuorumServer.h"
+
+#include <cassert>
+#include <cstdio>
+#include <cstdlib>
+#include <fstream>
+#include <sstream>
+
+ZooKeeperQuorumServer::
+ZooKeeperQuorumServer(uint32_t id, uint32_t numServers) :
+ id_(id),
+ numServers_(numServers) {
+ const char* root = getenv("ZKROOT");
+ if (root == NULL) {
+ assert(!"Environment variable 'ZKROOT' is not set");
+ }
+ root_ = root;
+ createConfigFile();
+ createDataDirectory();
+ start();
+}
+
+ZooKeeperQuorumServer::
+~ZooKeeperQuorumServer() {
+ stop();
+}
+
+std::string ZooKeeperQuorumServer::
+getHostPort() {
+ std::stringstream ss;
+ ss << "localhost:" << getClientPort();
+ return ss.str();
+}
+
+uint32_t ZooKeeperQuorumServer::
+getClientPort() {
+ return CLIENT_PORT_BASE + id_;
+}
+
+void ZooKeeperQuorumServer::
+start() {
+ std::string command = root_ + "/bin/zkServer.sh start " +
+ getConfigFileName();
+ assert(system(command.c_str()) == 0);
+}
+
+void ZooKeeperQuorumServer::
+stop() {
+ std::string command = root_ + "/bin/zkServer.sh stop " +
+ getConfigFileName();
+ assert(system(command.c_str()) == 0);
+}
+
+std::string ZooKeeperQuorumServer::
+getMode() {
+ char buf[1024];
+ std::string result;
+ std::string command = root_ + "/bin/zkServer.sh status " +
+ getConfigFileName();
+ FILE* output = popen(command.c_str(), "r");
+ do {
+ if (fgets(buf, 1024, output) != NULL) {
+ result += buf;
+ }
+ } while (!feof(output));
+ pclose(output);
+ if (result.find("Mode: leader") != std::string::npos) {
+ return "leader";
+ } else if (result.find("Mode: follower") != std::string::npos) {
+ return "follower";
+ } else {
+ printf("%s\n", result.c_str());
+ assert(!"unknown mode");
+ }
+}
+
+bool ZooKeeperQuorumServer::
+isLeader() {
+ return getMode() == "leader";
+}
+
+bool ZooKeeperQuorumServer::
+isFollower() {
+ return getMode() == "follower";
+}
+
+void ZooKeeperQuorumServer::
+createConfigFile() {
+ std::string command = "mkdir -p " + root_ + "/build/test/test-cppunit/conf";
+ assert(system(command.c_str()) == 0);
+ std::ofstream confFile;
+ std::stringstream ss;
+ ss << id_ << ".conf";
+ std::string fileName = root_ + "/build/test/test-cppunit/conf/" + ss.str();
+ confFile.open(fileName.c_str());
+ confFile << "tickTime=2000\n";
+ confFile << "clientPort=" << getClientPort() << "\n";
+ confFile << "initLimit=5\n";
+ confFile << "syncLimit=2\n";
+ confFile << "dataDir=" << getDataDirectory() << "\n";
+ for (int i = 0; i < numServers_; i++) {
+ confFile << getServerString(i) << "\n";
+ }
+ confFile.close();
+}
+
+std::string ZooKeeperQuorumServer::
+getConfigFileName() {
+ std::stringstream ss;
+ ss << id_ << ".conf";
+ return root_ + "/build/test/test-cppunit/conf/" + ss.str();
+}
+
+void ZooKeeperQuorumServer::
+createDataDirectory() {
+ std::string dataDirectory = getDataDirectory();
+ std::string command = "rm -rf " + dataDirectory;
+ assert(system(command.c_str()) == 0);
+ command = "mkdir -p " + dataDirectory;
+ assert(system(command.c_str()) == 0);
+ std::ofstream myidFile;
+ std::string fileName = dataDirectory + "/myid";
+ myidFile.open(fileName.c_str());
+ myidFile << id_ << "\n";
+ myidFile.close();
+ setenv("ZOO_LOG_DIR", dataDirectory.c_str(), true);
+}
+
+std::string ZooKeeperQuorumServer::
+getServerString() {
+ return getServerString(id_);
+}
+
+std::string ZooKeeperQuorumServer::
+getServerString(uint32_t id) {
+ std::stringstream ss;
+ ss << "server." << id << "=localhost:" << SERVER_PORT_BASE + id <<
+ ":" << ELECTION_PORT_BASE + id << ":participant;localhost:" <<
+ CLIENT_PORT_BASE + id;
+ return ss.str();
+}
+
+std::string ZooKeeperQuorumServer::
+getDataDirectory() {
+ std::stringstream ss;
+ ss << "data" << id_;
+ return root_ + "/build/test/test-cppunit/" + ss.str();
+}
+
+std::vector<ZooKeeperQuorumServer*> ZooKeeperQuorumServer::
+getCluster(uint32_t numServers) {
+ std::vector<ZooKeeperQuorumServer*> cluster;
+ for (int i = 0; i < numServers; i++) {
+ cluster.push_back(new ZooKeeperQuorumServer(i, numServers));
+ }
+ return cluster;
+}
Added: zookeeper/trunk/src/c/tests/ZooKeeperQuorumServer.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/ZooKeeperQuorumServer.h?rev=1453693&view=auto
==============================================================================
--- zookeeper/trunk/src/c/tests/ZooKeeperQuorumServer.h (added)
+++ zookeeper/trunk/src/c/tests/ZooKeeperQuorumServer.h Thu Mar 7 06:01:49 2013
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+#ifndef ZOOKEEPER_QUORUM_SERVER_H
+#define ZOOKEEPER_QUORUM_SERVER_H
+
+#include <stdint.h>
+#include <string>
+#include <vector>
+
+class ZooKeeperQuorumServer {
+ public:
+ ~ZooKeeperQuorumServer();
+ static std::vector<ZooKeeperQuorumServer*> getCluster(uint32_t numServers);
+ std::string getHostPort();
+ uint32_t getClientPort();
+ void start();
+ void stop();
+ bool isLeader();
+ bool isFollower();
+ std::string getServerString();
+
+ private:
+ ZooKeeperQuorumServer();
+ ZooKeeperQuorumServer(uint32_t id, uint32_t numServers);
+ ZooKeeperQuorumServer(const ZooKeeperQuorumServer& that);
+ const ZooKeeperQuorumServer& operator=(const ZooKeeperQuorumServer& that);
+ void createConfigFile();
+ std::string getConfigFileName();
+ void createDataDirectory();
+ std::string getDataDirectory();
+ static std::string getServerString(uint32_t id);
+ std::string getMode();
+
+ static const uint32_t SERVER_PORT_BASE = 2000;
+ static const uint32_t ELECTION_PORT_BASE = 3000;
+ static const uint32_t CLIENT_PORT_BASE = 4000;
+
+ uint32_t numServers_;
+ uint32_t id_;
+ std::string root_;
+};
+
+#endif // ZOOKEEPER_QUORUM_SERVER_H
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/KeeperException.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/KeeperException.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/KeeperException.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/KeeperException.java Thu Mar 7 06:01:49 2013
@@ -103,6 +103,10 @@ public abstract class KeeperException ex
return new UnimplementedException();
case OPERATIONTIMEOUT:
return new OperationTimeoutException();
+ case NEWCONFIGNOQUORUM:
+ return new NewConfigNoQuorum();
+ case RECONFIGINPROGRESS:
+ return new ReconfigInProgress();
case BADARGUMENTS:
return new BadArgumentsException();
case APIERROR:
@@ -277,10 +281,16 @@ public abstract class KeeperException ex
*/
@Deprecated
public static final int AuthFailed = -115;
- /**
- * This value will be used directly in {@link CODE#SESSIONMOVED}
- */
- // public static final int SessionMoved = -118;
+
+ // This value will be used directly in {@link CODE#SESSIONMOVED}
+ // public static final int SessionMoved = -118;
+
+ @Deprecated
+ public static final int NewConfigNoQuorum = -120;
+
+ @Deprecated
+ public static final int ReconfigInProgress= -121;
+
}
/** Codes which represent the various KeeperException
@@ -313,6 +323,11 @@ public abstract class KeeperException ex
OPERATIONTIMEOUT (OperationTimeout),
/** Invalid arguments */
BADARGUMENTS (BadArguments),
+ /** No quorum of new config is connected and up-to-date with the leader of last commmitted config - try
+ * invoking reconfiguration after new servers are connected and synced */
+ NEWCONFIGNOQUORUM (NewConfigNoQuorum),
+ /** Another reconfiguration is in progress -- concurrent reconfigs not supported (yet) */
+ RECONFIGINPROGRESS (ReconfigInProgress),
/** API errors.
* This is never thrown by the server, it shouldn't be used other than
@@ -326,7 +341,8 @@ public abstract class KeeperException ex
NONODE (NoNode),
/** Not authenticated */
NOAUTH (NoAuth),
- /** Version conflict */
+ /** Version conflict
+ In case of reconfiguration: reconfig requested from config version X but last seen config has a different version Y */
BADVERSION (BadVersion),
/** Ephemeral nodes may not have children */
NOCHILDRENFOREPHEMERALS (NoChildrenForEphemerals),
@@ -390,6 +406,10 @@ public abstract class KeeperException ex
return "ConnectionLoss";
case MARSHALLINGERROR:
return "MarshallingError";
+ case NEWCONFIGNOQUORUM:
+ return "NewConfigNoQuorum";
+ case RECONFIGINPROGRESS:
+ return "ReconfigInProgress";
case UNIMPLEMENTED:
return "Unimplemented";
case OPERATIONTIMEOUT:
@@ -590,6 +610,24 @@ public abstract class KeeperException ex
}
/**
+ * @see Code#NEWCONFIGNOQUORUM
+ */
+ public static class NewConfigNoQuorum extends KeeperException {
+ public NewConfigNoQuorum() {
+ super(Code.NEWCONFIGNOQUORUM);
+ }
+ }
+
+ /**
+ * @see Code#RECONFIGINPROGRESS
+ */
+ public static class ReconfigInProgress extends KeeperException {
+ public ReconfigInProgress() {
+ super(Code.RECONFIGINPROGRESS);
+ }
+ }
+
+ /**
* @see Code#NOCHILDRENFOREPHEMERALS
*/
public static class NoChildrenForEphemeralsException extends KeeperException {
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java Thu Mar 7 06:01:49 2013
@@ -25,6 +25,9 @@ import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
public class ZooDefs {
+
+ final public static String CONFIG_NODE = "/zookeeper/config";
+
public interface OpCode {
public final int notification = 0;
@@ -56,6 +59,8 @@ public class ZooDefs {
public final int create2 = 15;
+ public final int reconfig = 16;
+
public final int auth = 100;
public final int setWatches = 101;
@@ -117,5 +122,5 @@ public class ZooDefs {
final public static String[] opNames = { "notification", "create",
"delete", "exists", "getData", "setData", "getACL", "setACL",
- "getChildren", "getChildren2", "getMaxChildren", "setMaxChildren", "ping" };
+ "getChildren", "getChildren2", "getMaxChildren", "setMaxChildren", "ping", "reconfig", "getConfig" };
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java Thu Mar 7 06:01:49 2013
@@ -18,28 +18,61 @@
package org.apache.zookeeper;
-import org.apache.zookeeper.AsyncCallback.*;
-import org.apache.zookeeper.ClientCnxn.SendThread;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.zookeeper.AsyncCallback.ACLCallback;
+import org.apache.zookeeper.AsyncCallback.Children2Callback;
+import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
+import org.apache.zookeeper.AsyncCallback.Create2Callback;
+import org.apache.zookeeper.AsyncCallback.DataCallback;
+import org.apache.zookeeper.AsyncCallback.MultiCallback;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.OpResult.ErrorResult;
import org.apache.zookeeper.client.ConnectStringParser;
-import org.apache.zookeeper.client.HostProvider;
import org.apache.zookeeper.client.StaticHostProvider;
import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.common.StringUtils;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.proto.*;
+import org.apache.zookeeper.proto.Create2Request;
+import org.apache.zookeeper.proto.Create2Response;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.CreateResponse;
+import org.apache.zookeeper.proto.DeleteRequest;
+import org.apache.zookeeper.proto.ExistsRequest;
+import org.apache.zookeeper.proto.GetACLRequest;
+import org.apache.zookeeper.proto.GetACLResponse;
+import org.apache.zookeeper.proto.GetChildren2Request;
+import org.apache.zookeeper.proto.GetChildren2Response;
+import org.apache.zookeeper.proto.GetChildrenRequest;
+import org.apache.zookeeper.proto.GetChildrenResponse;
+import org.apache.zookeeper.proto.GetDataRequest;
+import org.apache.zookeeper.proto.GetDataResponse;
+import org.apache.zookeeper.proto.ReconfigRequest;
+import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.proto.RequestHeader;
+import org.apache.zookeeper.proto.SetACLRequest;
+import org.apache.zookeeper.proto.SetACLResponse;
+import org.apache.zookeeper.proto.SetDataRequest;
+import org.apache.zookeeper.proto.SetDataResponse;
+import org.apache.zookeeper.proto.SyncRequest;
+import org.apache.zookeeper.proto.SyncResponse;
import org.apache.zookeeper.server.DataTree;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.UnknownHostException;
-import java.util.*;
-
/**
* This is the main class of ZooKeeper client library. To use a ZooKeeper
* service, an application must first instantiate an object of ZooKeeper class.
@@ -1415,6 +1448,181 @@ public class ZooKeeper {
}
/**
+ * Return the last committed configuration (as known to the server to which the client is connected)
+ * and the stat of the configuration.
+ * <p>
+ * If the watch is non-null and the call is successful (no exception is
+ * thrown), a watch will be left on the configuration node (ZooDefs.CONFIG_NODE). The watch
+ * will be triggered by a successful reconfig operation
+ * <p>
+ * A KeeperException with error code KeeperException.NoNode will be thrown
+ * if the configuration node doesn't exists.
+ *
+ * @param watcher explicit watcher
+ * @param stat the stat of the configuration node ZooDefs.CONFIG_NODE
+ * @return configuration data stored in ZooDefs.CONFIG_NODE
+ * @throws KeeperException If the server signals an error with a non-zero error code
+ * @throws InterruptedException If the server transaction is interrupted.
+ */
+ public byte[] getConfig(Watcher watcher, Stat stat)
+ throws KeeperException, InterruptedException
+ {
+ final String configZnode = ZooDefs.CONFIG_NODE;
+
+ // the watch contains the un-chroot path
+ WatchRegistration wcb = null;
+ if (watcher != null) {
+ wcb = new DataWatchRegistration(watcher, configZnode);
+ }
+
+ RequestHeader h = new RequestHeader();
+ h.setType(ZooDefs.OpCode.getData);
+ GetDataRequest request = new GetDataRequest();
+ request.setPath(configZnode);
+ request.setWatch(watcher != null);
+ GetDataResponse response = new GetDataResponse();
+ ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
+ if (r.getErr() != 0) {
+ throw KeeperException.create(KeeperException.Code.get(r.getErr()),
+ configZnode);
+ }
+ if (stat != null) {
+ DataTree.copyStat(response.getStat(), stat);
+ }
+ return response.getData();
+ }
+
+ /**
+ * The asynchronous version of getConfig.
+ *
+ * @see #getConfig(Watcher, Stat)
+ */
+ public void getConfig(Watcher watcher,
+ DataCallback cb, Object ctx)
+ {
+ final String configZnode = ZooDefs.CONFIG_NODE;
+
+ // the watch contains the un-chroot path
+ WatchRegistration wcb = null;
+ if (watcher != null) {
+ wcb = new DataWatchRegistration(watcher, configZnode);
+ }
+
+ RequestHeader h = new RequestHeader();
+ h.setType(ZooDefs.OpCode.getData);
+ GetDataRequest request = new GetDataRequest();
+ request.setPath(configZnode);
+ request.setWatch(watcher != null);
+ GetDataResponse response = new GetDataResponse();
+ cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
+ configZnode, configZnode, ctx, wcb);
+ }
+
+
+ /**
+ * Return the last committed configuration (as known to the server to which the client is connected)
+ * and the stat of the configuration.
+ * <p>
+ * If the watch is true and the call is successful (no exception is
+ * thrown), a watch will be left on the configuration node (ZooDefs.CONFIG_NODE). The watch
+ * will be triggered by a successful reconfig operation
+ * <p>
+ * A KeeperException with error code KeeperException.NoNode will be thrown
+ * if no node with the given path exists.
+ *
+ * @param watch whether need to watch this node
+ * @param stat the stat of the configuration node ZooDefs.CONFIG_NODE
+ * @return configuration data stored in ZooDefs.CONFIG_NODE
+ * @throws KeeperException If the server signals an error with a non-zero error code
+ * @throws InterruptedException If the server transaction is interrupted.
+ */
+ public byte[] getConfig(boolean watch, Stat stat)
+ throws KeeperException, InterruptedException {
+ return getConfig(watch ? watchManager.defaultWatcher : null, stat);
+ }
+
+ /**
+ * The Asynchronous version of getConfig.
+ *
+ * @see #getData(String, boolean, Stat)
+ */
+ public void getConfig(boolean watch, DataCallback cb, Object ctx) {
+ getConfig(watch ? watchManager.defaultWatcher : null, cb, ctx);
+ }
+
+ /**
+ * Reconfigure - add/remove servers. Return the new configuration.
+ * @param joiningServers
+ * a comma separated list of servers being added (incremental reconfiguration)
+ * @param leavingServers
+ * a comma separated list of servers being removed (incremental reconfiguration)
+ * @param newMembers
+ * a comma separated list of new membership (non-incremental reconfiguration)
+ * @param fromConfig
+ * version of the current configuration (optional - causes reconfiguration to throw an exception if configuration is no longer current)
+ * @return new configuration
+ * @throws InterruptedException If the server transaction is interrupted.
+ * @throws KeeperException If the server signals an error with a non-zero error code.
+ */
+ public byte[] reconfig(String joiningServers, String leavingServers, String newMembers, long fromConfig, Stat stat) throws KeeperException, InterruptedException
+ {
+ RequestHeader h = new RequestHeader();
+ h.setType(ZooDefs.OpCode.reconfig);
+ ReconfigRequest request = new ReconfigRequest(joiningServers, leavingServers, newMembers, fromConfig);
+ GetDataResponse response = new GetDataResponse();
+ ReplyHeader r = cnxn.submitRequest(h, request, response, null);
+ if (r.getErr() != 0) {
+ throw KeeperException.create(KeeperException.Code.get(r.getErr()), "");
+ }
+ DataTree.copyStat(response.getStat(), stat);
+ return response.getData();
+ }
+
+ /**
+ * Convenience wrapper around reconfig that takes Lists of strings instead of comma-separated servers.
+ *
+ * @see #reconfig
+ *
+ */
+ public byte[] reconfig(List<String> joiningServers, List<String> leavingServers, List<String> newMembers, long fromConfig, Stat stat) throws KeeperException, InterruptedException
+ {
+ return reconfig(StringUtils.joinStrings(joiningServers, ","),
+ StringUtils.joinStrings(leavingServers, ","),
+ StringUtils.joinStrings(newMembers, ","),
+ fromConfig, stat);
+ }
+
+ /**
+ * The Asynchronous version of reconfig.
+ *
+ * @see #reconfig
+ *
+ **/
+ public void reconfig(String joiningServers, String leavingServers, String newMembers, long fromConfig, DataCallback cb, Object ctx) throws KeeperException, InterruptedException
+ {
+ RequestHeader h = new RequestHeader();
+ h.setType(ZooDefs.OpCode.reconfig);
+ ReconfigRequest request = new ReconfigRequest(joiningServers, leavingServers, newMembers, fromConfig);
+ GetDataResponse response = new GetDataResponse();
+ cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
+ ZooDefs.CONFIG_NODE, ZooDefs.CONFIG_NODE, ctx, null);
+ }
+
+ /**
+ * Convenience wrapper around asynchronous reconfig that takes Lists of strings instead of comma-separated servers.
+ *
+ * @see #reconfig
+ *
+ */
+ public void reconfig(List<String> joiningServers, List<String> leavingServers, List<String> newMembers, long fromConfig, DataCallback cb, Object ctx) throws KeeperException, InterruptedException
+ {
+ reconfig(StringUtils.joinStrings(joiningServers, ","),
+ StringUtils.joinStrings(leavingServers, ","),
+ StringUtils.joinStrings(newMembers, ","),
+ fromConfig, cb, ctx);
+ }
+
+ /**
* Set the data for the node of the given path if such a node exists and the
* given version matches the version of the node (if the given version is
* -1, it matches any node's versions). Return the stat of the node.
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java Thu Mar 7 06:01:49 2013
@@ -49,9 +49,11 @@ import org.apache.zookeeper.cli.DeleteAl
import org.apache.zookeeper.cli.DeleteCommand;
import org.apache.zookeeper.cli.GetAclCommand;
import org.apache.zookeeper.cli.GetCommand;
+import org.apache.zookeeper.cli.GetConfigCommand;
import org.apache.zookeeper.cli.ListQuotaCommand;
import org.apache.zookeeper.cli.Ls2Command;
import org.apache.zookeeper.cli.LsCommand;
+import org.apache.zookeeper.cli.ReconfigCommand;
import org.apache.zookeeper.cli.SetAclCommand;
import org.apache.zookeeper.cli.SetCommand;
import org.apache.zookeeper.cli.SetQuotaCommand;
@@ -105,6 +107,8 @@ public class ZooKeeperMain {
new ListQuotaCommand().addToMap(commandMapCli);
new DelQuotaCommand().addToMap(commandMapCli);
new AddAuthCommand().addToMap(commandMapCli);
+ new ReconfigCommand().addToMap(commandMapCli);
+ new GetConfigCommand().addToMap(commandMapCli);
// add all to commandMap
for (Entry<String, CliCommand> entry : commandMapCli.entrySet()) {
@@ -583,6 +587,13 @@ public class ZooKeeperMain {
System.err.println("Arguments are not valid : "+e.getPath());
}catch (KeeperException.BadVersionException e) {
System.err.println("version No is not valid : "+e.getPath());
+ }catch (KeeperException.ReconfigInProgress e) {
+ System.err.println("Another reconfiguration is in progress -- concurrent " +
+ "reconfigs not supported (yet)");
+ }catch (KeeperException.NewConfigNoQuorum e) {
+ System.err.println("No quorum of new config is connected and " +
+ "up-to-date with the leader of last commmitted config - try invoking reconfiguration after " +
+ "new servers are connected and synced");
}
return false;
}
Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/GetConfigCommand.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/GetConfigCommand.java?rev=1453693&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/GetConfigCommand.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/GetConfigCommand.java Thu Mar 7 06:01:49 2013
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.cli;
+
+import org.apache.commons.cli.*;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.util.ConfigUtils;
+
+/**
+ * get command for cli
+ */
+public class GetConfigCommand extends CliCommand {
+
+ private static Options options = new Options();
+ private String args[];
+ private CommandLine cl;
+
+ {
+ options.addOption("s", false, "stats");
+ options.addOption("w", false, "watch");
+ options.addOption("c", false, "client connection string");
+ }
+
+ public GetConfigCommand() {
+ super("config", "[-c] [-w] [-s]");
+ }
+
+ @Override
+ public CliCommand parse(String[] cmdArgs) throws ParseException {
+
+ Parser parser = new PosixParser();
+ cl = parser.parse(options, cmdArgs);
+ args = cl.getArgs();
+ if (args.length < 1) {
+ throw new ParseException(getUsageStr());
+ }
+
+ return this;
+ }
+
+ @Override
+ public boolean exec() throws KeeperException, InterruptedException {
+ boolean watch = cl.hasOption("w");
+ Stat stat = new Stat();
+ byte data[] = zk.getConfig(watch, stat);
+ data = (data == null) ? "null".getBytes() : data;
+ if (cl.hasOption("c")) {
+ out.println(ConfigUtils.getClientConfigStr(new String(data)));
+ } else {
+ out.println(new String(data));
+ }
+
+ if (cl.hasOption("s")) {
+ new StatPrinter(out).print(stat);
+ }
+
+ return watch;
+ }
+}
Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/ReconfigCommand.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/ReconfigCommand.java?rev=1453693&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/ReconfigCommand.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/ReconfigCommand.java Thu Mar 7 06:01:49 2013
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.cli;
+
+import java.io.FileInputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.cli.*;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+
+/**
+ * reconfig command for cli
+ */
+public class ReconfigCommand extends CliCommand {
+
+ private static Options options = new Options();
+
+ /* joining - comma separated list of server config strings for servers to be added to the ensemble.
+ * Each entry is identical in syntax as it would appear in a configuration file. Only used for
+ * incremental reconfigurations.
+ */
+ private String joining;
+
+ /* leaving - comma separated list of server IDs to be removed from the ensemble. Only used for
+ * incremental reconfigurations.
+ */
+ private String leaving;
+
+ /* members - comma separated list of new membership information (e.g., contents of a membership
+ * configuration file) - for use only with a non-incremental reconfiguration. This may be specified
+ * manually via the -members flag or it will automatically be filled in by reading the contents
+ * of an actual configuration file using the -file flag.
+ */
+ private String members;
+
+ /* version - version of config from which we want to reconfigure - if current config is different
+ * reconfiguration will fail. Should be ommitted from the CLI to disable this option.
+ */
+ long version = -1;
+ private CommandLine cl;
+
+ {
+ options.addOption("s", false, "stats");
+ options.addOption("v", true, "required current config version");
+ options.addOption("file", true, "path of config file to parse for membership");
+ options.addOption("members", true, "comma-separated list of config strings for " +
+ "non-incremental reconfig");
+ options.addOption("add", true, "comma-separated list of config strings for " +
+ "new servers");
+ options.addOption("remove", true, "comma-separated list of server IDs to remove");
+ }
+
+ public ReconfigCommand() {
+ super("reconfig", "[-s] " +
+ "[-v version] " +
+ "[[-file path] | " +
+ "[-members serverID=host:port1:port2;port3[,...]*]] | " +
+ "[-add serverId=host:port1:port2;port3[,...]]* " +
+ "[-remove serverId[,...]*]");
+ }
+
+ @Override
+ public CliCommand parse(String[] cmdArgs) throws ParseException {
+ joining = null;
+ leaving = null;
+ members = null;
+ Parser parser = new PosixParser();
+ cl = parser.parse(options, cmdArgs);
+ if (!cl.hasOption("file") && !cl.hasOption("add") && !cl.hasOption("remove")) {
+ throw new ParseException(getUsageStr());
+ }
+ if (cl.hasOption("v")) {
+ try{
+ version = Long.parseLong(cl.getOptionValue("v"), 16);
+ } catch (NumberFormatException e){
+ throw new ParseException("-v must be followed by a long (configuration version)");
+ }
+ } else {
+ version = -1;
+ }
+
+ // Simple error checking for conflicting modes
+ if ((cl.hasOption("file") || cl.hasOption("members")) && (cl.hasOption("add") || cl.hasOption("remove"))) {
+ throw new ParseException("Can't use -file or -members together with -add or -remove (mixing incremental" +
+ " and non-incremental modes is not allowed)");
+ }
+ if (cl.hasOption("file") && cl.hasOption("members"))
+ {
+ throw new ParseException("Can't use -file and -members together (conflicting non-incremental modes)");
+ }
+
+ // Set the joining/leaving/members values based on the mode we're in
+ if (cl.hasOption("add")) {
+ joining = cl.getOptionValue("add").toLowerCase();
+ }
+ if (cl.hasOption("remove")) {
+ leaving = cl.getOptionValue("remove").toLowerCase();
+ }
+ if (cl.hasOption("members")) {
+ members = cl.getOptionValue("members").toLowerCase();
+ }
+ if (cl.hasOption("file")) {
+ try {
+ FileInputStream inConfig = new FileInputStream(cl.getOptionValue("file"));
+ QuorumPeerConfig config = new QuorumPeerConfig();
+ Properties dynamicCfg = new Properties();
+ try {
+ dynamicCfg.load(inConfig);
+ } finally {
+ inConfig.close();
+ }
+ //check that membership makes sense; leader will make these checks again
+ //don't check for leader election ports since
+ //client doesn't know what leader election alg is used
+ config.parseDynamicConfig(dynamicCfg, 0, true);
+ members = config.getQuorumVerifier().toString();
+ } catch (Exception e) {
+ throw new ParseException("Error processing " + cl.getOptionValue("file") + e.getMessage());
+ }
+ }
+ return this;
+ }
+
+ @Override
+ public boolean exec() throws KeeperException, InterruptedException {
+ try {
+ Stat stat = new Stat();
+ byte[] curConfig = zk.reconfig(joining,
+ leaving, members, version, stat);
+ out.println("Committed new configuration:\n" + new String(curConfig));
+
+ if (cl.hasOption("s")) {
+ new StatPrinter(out).print(stat);
+ }
+ } catch (KeeperException ex) {
+ err.println(ex.getMessage());
+ }
+ return false;
+ }
+}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/common/StringUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/common/StringUtils.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/common/StringUtils.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/common/StringUtils.java Thu Mar 7 06:01:49 2013
@@ -41,4 +41,23 @@ public class StringUtils {
}
return Collections.unmodifiableList(results);
}
+
+ /**
+ * This method takes a List<String> and a delimiter and joins the strings
+ * into a single string, where the original strings are separated using
+ * the given delimiter.
+ *
+ */
+ public static String joinStrings(List<String> list, String delim)
+ {
+ if (list == null)
+ return null;
+
+ StringBuilder builder = new StringBuilder(list.get(0));
+ for (String s : list.subList(1, list.size())) {
+ builder.append(delim).append(s);
+ }
+
+ return builder.toString();
+ }
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java Thu Mar 7 06:01:49 2013
@@ -43,6 +43,7 @@ import org.apache.zookeeper.Quotas;
import org.apache.zookeeper.StatsTrack;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.Watcher.Event;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -107,6 +108,16 @@ public class DataTree {
.substring(procZookeeper.length() + 1);
/**
+ * the zookeeper config node that acts as the config management node for
+ * zookeeper
+ */
+ private static final String configZookeeper = ZooDefs.CONFIG_NODE;
+
+ /** this will be the string thats stored as a child of /zookeeper */
+ private static final String configChildZookeeper = configZookeeper
+ .substring(procZookeeper.length() + 1);
+
+ /**
* the path trie that keeps track fo the quota nodes in this datatree
*/
private final PathTrie pTrie = new PathTrie();
@@ -254,6 +265,13 @@ public class DataTree {
*/
private final DataNode quotaDataNode = new DataNode(new byte[0], -1L, new StatPersisted());
+ /**
+ * create a /zookeeper/config node for maintaining the configuration (membership and quorum system) info for
+ * zookeeper
+ */
+ private DataNode configDataNode = new DataNode(new byte[0], -1L, new StatPersisted());
+
+
public DataTree() {
/* Rather than fight it, let root have an alias */
nodes.put("", root);
@@ -265,8 +283,20 @@ public class DataTree {
procDataNode.addChild(quotaChildZookeeper);
nodes.put(quotaZookeeper, quotaDataNode);
+
+ addConfigNode();
}
+ public void addConfigNode() {
+ DataNode zookeeperZnode = nodes.get(procZookeeper);
+ if (zookeeperZnode!=null) { // should always be the case
+ zookeeperZnode.addChild(configChildZookeeper);
+ } else {
+ LOG.error("There's no /zookeeper znode - this should never happen");
+ }
+ nodes.put(configZookeeper, configDataNode);
+ }
+
/**
* is the path one of the special paths owned by zookeeper.
*
@@ -276,7 +306,7 @@ public class DataTree {
*/
boolean isSpecialPath(String path) {
if (rootZookeeper.equals(path) || procZookeeper.equals(path)
- || quotaZookeeper.equals(path)) {
+ || quotaZookeeper.equals(path) || configZookeeper.equals(path)) {
return true;
}
return false;
@@ -798,6 +828,7 @@ public class DataTree {
rc.path = deleteTxn.getPath();
deleteNode(deleteTxn.getPath(), header.getZxid());
break;
+ case OpCode.reconfig:
case OpCode.setData:
SetDataTxn setDataTxn = (SetDataTxn) txn;
rc.path = setDataTxn.getPath();
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java Thu Mar 7 06:01:49 2013
@@ -53,7 +53,7 @@ import org.apache.zookeeper.proto.SyncRe
import org.apache.zookeeper.proto.SyncResponse;
import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
-import org.apache.zookeeper.txn.CreateSessionTxn;
+import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer;
import org.apache.zookeeper.txn.ErrorTxn;
import org.apache.zookeeper.txn.TxnHeader;
@@ -237,6 +237,12 @@ public class FinalRequestProcessor imple
rsp = new SetDataResponse(rc.stat);
err = Code.get(rc.err);
break;
+ }
+ case OpCode.reconfig: {
+ lastOp = "RECO";
+ rsp = new GetDataResponse(((QuorumZooKeeperServer)zks).self.getQuorumVerifier().toString().getBytes(), rc.stat);
+ err = Code.get(rc.err);
+ break;
}
case OpCode.setACL: {
lastOp = "SETA";